Graph loading, generation, and utility functions for creating test graphs, importing data, performance optimization, and working with GraphX efficiently.
Load graphs from various file formats and data sources.
object GraphLoader {
/**
* Load graph from edge list file format
* @param sc SparkContext
* @param path Path to edge list file (each line: "srcId dstId" or "srcId dstId weight")
* @param canonicalOrientation Whether to orient edges canonically (srcId < dstId)
* @param numEdgePartitions Number of edge partitions (-1 for default)
* @param edgeStorageLevel Storage level for edges
* @param vertexStorageLevel Storage level for vertices
* @returns Graph with integer vertex and edge attributes
*/
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
numEdgePartitions: Int = -1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): Graph[Int, Int]
}Usage Examples:
import org.apache.spark.graphx._
// Load graph from edge list file
// File format: each line contains "srcId dstId" or "srcId dstId weight"
val graph = GraphLoader.edgeListFile(sc, "path/to/edges.txt")
// Load with canonical orientation (srcId < dstId)
val canonicalGraph = GraphLoader.edgeListFile(
sc,
"path/to/edges.txt",
canonicalOrientation = true
)
// Load with custom partitioning and storage
val optimizedGraph = GraphLoader.edgeListFile(
sc,
"hdfs://cluster/large-graph.txt",
canonicalOrientation = false,
numEdgePartitions = 100,
edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
vertexStorageLevel = StorageLevel.MEMORY_ONLY
)Generate synthetic graphs for testing, benchmarking, and algorithm development.
object GraphGenerators {
/** Default RMAT parameters for realistic graph generation */
val RMATa: Double = 0.45
val RMATb: Double = 0.15
val RMATd: Double = 0.25
// RMATc = 1.0 - RMATa - RMATb - RMATd = 0.15
/**
* Generate log-normal degree distribution graph
* @param sc SparkContext
* @param numVertices Number of vertices
* @param numEParts Number of edge partitions (-1 for default)
* @param mu Mean of underlying normal distribution
* @param sigma Standard deviation of underlying normal distribution
* @param seed Random seed for reproducibility
* @returns Graph with long vertex attributes and integer edge attributes
*/
def logNormalGraph(
sc: SparkContext,
numVertices: Int,
numEParts: Int = -1,
mu: Double = 4.0,
sigma: Double = 1.3,
seed: Long = -1
): Graph[Long, Int]
/**
* Generate R-MAT graph with realistic structure
* @param sc SparkContext
* @param requestedNumVertices Desired number of vertices (will be rounded up to power of 2)
* @param numEdges Number of edges to generate
* @param a Probability of edge in top-left quadrant
* @param b Probability of edge in top-right quadrant
* @param c Probability of edge in bottom-left quadrant
* @param d Probability of edge in bottom-right quadrant (a+b+c+d should equal 1.0)
* @param seed Random seed
* @param numEParts Number of edge partitions
* @returns R-MAT graph
*/
def rmatGraph(
sc: SparkContext,
requestedNumVertices: Int,
numEdges: Int,
a: Double = RMATa,
b: Double = RMATb,
c: Double = 1.0 - RMATa - RMATb - RMATd,
d: Double = RMATd,
seed: Long = -1,
numEParts: Int = -1
): Graph[Int, Int]
/**
* Generate star graph (one central vertex connected to all others)
* @param sc SparkContext
* @param nverts Number of vertices (including center)
* @param numEParts Number of edge partitions
* @returns Star graph with center at vertex 0
*/
def starGraph(
sc: SparkContext,
nverts: Int,
numEParts: Int = -1
): Graph[Int, Int]
/**
* Generate 2D grid graph
* @param sc SparkContext
* @param rows Number of rows in grid
* @param cols Number of columns in grid
* @returns Grid graph with vertices connected to adjacent cells
*/
def gridGraph(
sc: SparkContext,
rows: Int,
cols: Int
): Graph[(Int, Int), Double]
}Usage Examples:
import org.apache.spark.graphx.util.GraphGenerators
// Generate log-normal degree distribution graph (realistic social networks)
val socialGraph = GraphGenerators.logNormalGraph(sc, numVertices = 1000)
// Generate R-MAT graph with default parameters
val rmatGraph = GraphGenerators.rmatGraph(sc, requestedNumVertices = 1024, numEdges = 5000)
// Generate custom R-MAT with different parameters
val customRMAT = GraphGenerators.rmatGraph(
sc,
requestedNumVertices = 2048,
numEdges = 10000,
a = 0.57, b = 0.19, c = 0.19, d = 0.05 // More skewed distribution
)
// Generate star graph for testing centrality algorithms
val starGraph = GraphGenerators.starGraph(sc, nverts = 100)
// Generate grid graph for spatial algorithms
val gridGraph = GraphGenerators.gridGraph(sc, rows = 10, cols = 10)
// Use generated graphs for testing
val pageRanks = rmatGraph.pageRank(0.001).vertices
val components = socialGraph.connectedComponents().verticesUtility functions for graph optimization, serialization, and performance tuning.
object GraphXUtils {
/**
* Register GraphX classes with Kryo serialization for better performance
* @param conf SparkConf to modify
*/
def registerKryoClasses(conf: SparkConf): Unit
}
class PeriodicGraphCheckpointer[VD: ClassTag, ED: ClassTag](
checkpointInterval: Int,
sc: SparkContext
) {
/**
* Update the graph, managing checkpointing and persistence automatically
* @param graph New graph to manage
*/
def update(graph: Graph[VD, ED]): Unit
/**
* Checkpoint the current graph if needed
*/
def checkpoint(): Unit
/**
* Clean up all cached/checkpointed graphs
*/
def deleteAllCheckpoints(): Unit
}
object BytecodeUtils {
/**
* Test whether a closure invokes a specific method (for optimization)
* @param closure Function closure to analyze
* @param targetClass Class containing the target method
* @param targetMethod Name of method to check for
* @returns Whether the closure calls the target method
*/
def invokedMethod(
closure: AnyRef,
targetClass: Class[_],
targetMethod: String
): Boolean
}Different strategies for distributing edges across partitions to optimize performance.
trait PartitionStrategy {
/**
* Determine which partition an edge should be assigned to
* @param src Source vertex ID
* @param dst Destination vertex ID
* @param numParts Total number of partitions
* @returns Partition ID for this edge
*/
def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
}
object PartitionStrategy {
/**
* 2D edge partitioning with vertex replication bound of √numParts
* Provides good load balancing and communication efficiency
*/
val EdgePartition2D: PartitionStrategy
/**
* Hash partitioning by source vertex only
* Simple but can lead to load imbalance
*/
val EdgePartition1D: PartitionStrategy
/**
* Random partitioning that colocates same-direction edges
* Good for undirected graphs
*/
val RandomVertexCut: PartitionStrategy
/**
* Random partitioning that colocates all edges between vertex pairs
* Reduces communication for algorithms using both edge directions
*/
val CanonicalRandomVertexCut: PartitionStrategy
/**
* Get partition strategy by string name
* @param s Strategy name ("EdgePartition1D", "EdgePartition2D", etc.)
* @returns Corresponding PartitionStrategy
*/
def fromString(s: String): PartitionStrategy
}Usage Examples:
// Optimize graph partitioning for better performance
val optimizedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D)
// Use different strategies based on graph characteristics
val strategy = if (graph.numVertices > 1000000) {
PartitionStrategy.EdgePartition2D // Better for large graphs
} else {
PartitionStrategy.RandomVertexCut // Simpler for small graphs
}
val partitionedGraph = graph.partitionBy(strategy)
// Get strategy from configuration
val strategyName = "EdgePartition1D"
val configuredStrategy = PartitionStrategy.fromString(strategyName)Helper functions and patterns for optimizing GraphX performance.
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
/**
* Convert multiple edges between vertices to single edges
* @param merge Function to combine edge attributes
* @returns Graph with merged parallel edges
*/
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
/**
* Remove self-loops (edges from vertex to itself)
* @returns Graph without self-loops
*/
def removeSelfEdges(): Graph[VD, ED]
/**
* Pick a random vertex ID from the graph
* @returns Random vertex ID
*/
def pickRandomVertex(): VertexId
/**
* Filter graph with preprocessing optimization
* @param preprocess Function to optimize graph before filtering
* @param epred Edge predicate
* @param vpred Vertex predicate
* @returns Filtered graph
*/
def filter[VD2: ClassTag, ED2: ClassTag](
preprocess: Graph[VD, ED] => Graph[VD2, ED2],
epred: EdgeTriplet[VD2, ED2] => Boolean,
vpred: (VertexId, VD2) => Boolean
): Graph[VD, ED]
}Control which triplet fields are accessed to optimize message passing performance.
class TripletFields(val useSrc: Boolean, val useDst: Boolean, val useEdge: Boolean) {
// Java class for specifying which EdgeTriplet/EdgeContext fields are accessed
}
object TripletFields {
/** No fields are accessed */
val None: TripletFields
/** Only edge attribute is accessed */
val EdgeOnly: TripletFields
/** Source vertex and edge attributes are accessed */
val Src: TripletFields
/** Destination vertex and edge attributes are accessed */
val Dst: TripletFields
/** All fields are accessed (default) */
val All: TripletFields
}Usage Examples:
// Optimize aggregateMessages with TripletFields
val inDegrees = graph.aggregateMessages[Int](
sendMsg = ctx => ctx.sendToDst(1), // Only sending to destination
mergeMsg = (a, b) => a + b,
tripletFields = TripletFields.None // No triplet fields needed
)
val weightedInDegrees = graph.aggregateMessages[Double](
sendMsg = ctx => ctx.sendToDst(ctx.attr), // Using edge attribute
mergeMsg = (a, b) => a + b,
tripletFields = TripletFields.EdgeOnly // Only edge attribute needed
)
val neighborSum = graph.aggregateMessages[Double](
sendMsg = ctx => ctx.sendToDst(ctx.srcAttr), // Using source attribute
mergeMsg = (a, b) => a + b,
tripletFields = TripletFields.Src // Only source attribute needed
)// Efficient graph construction for large datasets
def buildLargeGraph(vertices: RDD[(VertexId, String)], edges: RDD[Edge[Double]]): Graph[String, Double] = {
// Partition edges for better locality
val partitionedEdges = edges.partitionBy(new HashPartitioner(100))
// Use appropriate storage levels
val graph = Graph(
vertices,
partitionedEdges,
defaultVertexAttr = "Unknown",
edgeStorageLevel = StorageLevel.MEMORY_AND_DISK_SER,
vertexStorageLevel = StorageLevel.MEMORY_ONLY
)
// Apply efficient partitioning strategy
graph.partitionBy(PartitionStrategy.EdgePartition2D).cache()
}// Optimize for iterative algorithms
def optimizeForIterativeAlgorithms[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED]
): Graph[VD, ED] = {
graph
.partitionBy(PartitionStrategy.EdgePartition2D) // Better load balancing
.cache() // Cache for multiple iterations
.checkpoint() // Checkpoint for fault tolerance
}
// Use with iterative algorithms
val optimizedGraph = optimizeForIterativeAlgorithms(graph)
val pageRanks = optimizedGraph.pageRank(0.001)
val components = optimizedGraph.connectedComponents()
// Clean up when done
optimizedGraph.unpersist()// Manage memory usage for large graphs
def memoryEfficientProcessing[VD: ClassTag, ED: ClassTag](
graph: Graph[VD, ED]
): Unit = {
// Use serialized storage for large graphs
val efficientGraph = graph.persist(StorageLevel.MEMORY_AND_DISK_SER)
try {
// Process graph
val results = efficientGraph.pageRank(0.001)
// Process results immediately
results.vertices.foreachPartition { iter =>
iter.foreach { case (id, rank) =>
// Process each result
}
}
} finally {
// Always clean up
efficientGraph.unpersist(blocking = false)
}
}// Configure Spark for optimal GraphX performance
def configureSparkForGraphX(appName: String): SparkContext = {
val conf = new SparkConf()
.setAppName(appName)
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
.set("spark.locality.wait", "0s") // Disable locality wait for GraphX
.set("spark.sql.adaptive.enabled", "false") // Can interfere with GraphX
// Register GraphX classes with Kryo
GraphXUtils.registerKryoClasses(conf)
new SparkContext(conf)
}// Validate graph structure and properties
def validateGraph[VD, ED](graph: Graph[VD, ED]): Unit = {
println(s"Vertices: ${graph.numVertices}")
println(s"Edges: ${graph.numEdges}")
// Check for self-loops
val selfLoops = graph.edges.filter(e => e.srcId == e.dstId).count()
println(s"Self-loops: $selfLoops")
// Check degree distribution
val degrees = graph.degrees.map(_._2)
val maxDegree = degrees.max()
val avgDegree = degrees.mean()
println(s"Max degree: $maxDegree, Average degree: $avgDegree")
// Check connectivity
val components = graph.connectedComponents().vertices.map(_._2).distinct().count()
println(s"Connected components: $components")
}// Convert between different graph representations
def convertEdgeListToAdjacencyList[ED](graph: Graph[Long, ED]): RDD[(VertexId, Array[VertexId])] = {
graph.collectNeighborIds(EdgeDirection.Out)
}
def saveGraphToEdgeList[VD, ED](graph: Graph[VD, ED], path: String): Unit = {
graph.edges
.map(edge => s"${edge.srcId} ${edge.dstId}")
.saveAsTextFile(path)
}
def loadGraphFromAdjacencyList(sc: SparkContext, path: String): Graph[Int, Int] = {
val adjacencyList = sc.textFile(path).map { line =>
val parts = line.split("\\s+")
val src = parts(0).toLong
val neighbors = parts.tail.map(_.toLong)
(src, neighbors)
}
val edges = adjacencyList.flatMap { case (src, neighbors) =>
neighbors.map(dst => Edge(src, dst, 1))
}
Graph.fromEdges(edges, defaultValue = 0)
}