GraphX is Apache Spark's API for graphs and graph-parallel computation
npx @tessl/cli install tessl/maven-org-apache-spark--spark-graphx_2-12@3.5.0GraphX is Apache Spark's API for graphs and graph-parallel computation. It provides a distributed graph processing framework built on top of Spark RDDs, offering both graph-parallel and data-parallel views of the same physical data. GraphX enables users to seamlessly move between graph structures and tabular data, making it ideal for ETL, exploratory analysis, and iterative graph computation.
build.sbt: "org.apache.spark" %% "spark-graphx" % "3.5.6"import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevelFor specific imports:
import org.apache.spark.graphx.{Graph, VertexId, Edge, EdgeTriplet}
import org.apache.spark.graphx.{VertexRDD, EdgeRDD, GraphOps}
import org.apache.spark.graphx.{PartitionStrategy, EdgeDirection}import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
// Create vertices RDD: (VertexId, VertexAttribute)
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
(1L, "Alice"),
(2L, "Bob"),
(3L, "Charlie")
))
// Create edges RDD
val edges: RDD[Edge[String]] = sc.parallelize(Array(
Edge(1L, 2L, "friend"),
Edge(2L, 3L, "friend"),
Edge(3L, 1L, "friend")
))
// Build the graph
val graph: Graph[String, String] = Graph(vertices, edges)
// Basic operations
println(s"Vertices: ${graph.numVertices}")
println(s"Edges: ${graph.numEdges}")
// Transform vertex attributes
val transformedGraph = graph.mapVertices((id, attr) => attr.toUpperCase)
// Run PageRank algorithm
val ranks = graph.pageRank(0.0001).verticesGraphX is built around several key components:
VertexRDD and EdgeRDD provide efficient graph-specific operationsEdgeTriplet joins edges with adjacent vertex attributes for message passingFundamental graph construction, transformation, and analysis operations for building and manipulating graph structures.
// Graph construction
def Graph.apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]]
): Graph[VD, ED]
def Graph.fromEdges[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultValue: VD
): Graph[VD, ED]
// Graph transformations
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]Comprehensive collection of graph algorithms including PageRank, Connected Components, Triangle Counting, and community detection.
// PageRank algorithms
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
// Component algorithms
def connectedComponents(): Graph[VertexId, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
// Community detection
def triangleCount(): Graph[Int, ED]Vertex-centric programming framework for implementing custom iterative graph algorithms using the Pregel computational model.
def pregel[A: ClassTag](
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either
)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A
): Graph[VD, ED]Graph loading, generation, and utility functions for creating test graphs, importing data, and performance optimization.
// Graph loading
def GraphLoader.edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
numEdgePartitions: Int = -1
): Graph[Int, Int]
// Graph generation
def GraphGenerators.logNormalGraph(
sc: SparkContext,
numVertices: Int,
numEParts: Int = -1,
mu: Double = 4.0,
sigma: Double = 1.3
): Graph[Long, Int]// Type aliases
type VertexId = Long
type PartitionID = Int
// Core data structures
case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED)
class EdgeTriplet[VD, ED] extends Edge[ED] {
val srcAttr: VD
val dstAttr: VD
}
abstract class Graph[VD: ClassTag, ED: ClassTag] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
}
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)]
abstract class EdgeRDD[ED] extends RDD[Edge[ED]]Graph Construction from Data:
// From edge list file
val graph = GraphLoader.edgeListFile(sc, "path/to/edges.txt")
// From existing RDDs
val graph = Graph(verticesRDD, edgesRDD)
// From edge tuples with default vertex values
val graph = Graph.fromEdgeTuples(edgeTuples, defaultValue = "Unknown")Performance Optimization:
// Cache for iterative algorithms
val cachedGraph = graph.cache()
// Partition for better locality
val partitionedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D)
// Checkpoint for fault tolerance
graph.checkpoint()