Spec Registry
Help your agents use open-source better. Learn more.
Find usage specs for your project’s dependencies
- Author
- tessl
- Last updated
- Spec files
maven-apache-spark
Describes: maven/org.apache.spark/spark-parent
- Description
- Lightning-fast unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R
- Author
- tessl
- Last updated
graphx.md docs/
1# GraphX - Graph Processing23GraphX is Apache Spark's API for graphs and graph-parallel computation. It extends the Spark RDD abstraction with a Resilient Distributed Property Graph: a directed multigraph with properties attached to each vertex and edge.45## Core Graph Abstractions67### Graph Class89The central abstraction in GraphX:1011```scala { .api }12abstract class Graph[VD: ClassTag, ED: ClassTag] extends Serializable {13// Core properties14def vertices: VertexRDD[VD]15def edges: EdgeRDD[ED]16def triplets: RDD[EdgeTriplet[VD, ED]]1718// Structural operations19def reverse: Graph[VD, ED]20def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = x => true, vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED]21def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]2223// Transformation operations24def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]25def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]26def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]27}28```2930### VertexId Type3132```scala { .api }33type VertexId = Long34```3536### Edge Class3738Represents a directed edge in the graph:3940```scala { .api }41case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED) extends Serializable42```4344### EdgeTriplet Class4546Joins vertex and edge data:4748```scala { .api }49class EdgeTriplet[VD, ED] extends Edge[ED] {50def srcId: VertexId // Source vertex ID51def dstId: VertexId // Destination vertex ID52def attr: ED // Edge attribute53def srcAttr: VD // Source vertex attribute54def dstAttr: VD // Destination vertex attribute55}56```5758## Creating Graphs5960### Graph Construction6162**Graph.apply**: Create graph from vertices and edges63```scala { .api }64object Graph {65def apply[VD: ClassTag, ED: ClassTag](vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null): Graph[VD, ED]66}67```6869```scala70import org.apache.spark.graphx._71import org.apache.spark.rdd.RDD7273// Create vertices RDD74val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(75(1L, "Alice"),76(2L, "Bob"),77(3L, "Charlie"),78(4L, "David")79))8081// Create edges RDD82val edges: RDD[Edge[String]] = sc.parallelize(Array(83Edge(1L, 2L, "friend"),84Edge(2L, 3L, "friend"),85Edge(3L, 4L, "colleague"),86Edge(1L, 4L, "colleague")87))8889// Create graph90val graph = Graph(vertices, edges, defaultVertexAttr = "Unknown")91```9293**Graph.fromEdges**: Create graph from edges only94```scala { .api }95def fromEdges[VD: ClassTag, ED: ClassTag](edges: RDD[Edge[ED]], defaultValue: VD): Graph[VD, ED]96```9798```scala99// Create graph from edges (vertices inferred)100val relationships: RDD[Edge[String]] = sc.parallelize(Array(101Edge(1L, 2L, "follows"),102Edge(2L, 3L, "follows"),103Edge(3L, 1L, "follows")104))105106val socialGraph = Graph.fromEdges(relationships, defaultValue = "user")107```108109**Graph.fromEdgeTuples**: Create unweighted graph from tuples110```scala { .api }111def fromEdgeTuples[VD: ClassTag](rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int]112```113114```scala115// Simple edge list as tuples116val edgeTuples: RDD[(VertexId, VertexId)] = sc.parallelize(Array(117(1L, 2L), (2L, 3L), (3L, 1L), (1L, 3L)118))119120val simpleGraph = Graph.fromEdgeTuples(edgeTuples, defaultValue = 1)121```122123## Graph Properties and Operations124125### Basic Properties126127```scala128val numVertices = graph.vertices.count()129val numEdges = graph.edges.count()130131println(s"Graph has $numVertices vertices and $numEdges edges")132133// Access vertices and edges134graph.vertices.collect().foreach { case (id, attr) =>135println(s"Vertex $id: $attr")136}137138graph.edges.collect().foreach { edge =>139println(s"Edge ${edge.srcId} -> ${edge.dstId}: ${edge.attr}")140}141142// Access triplets (vertex-edge-vertex)143graph.triplets.collect().foreach { triplet =>144println(s"${triplet.srcAttr} -${triplet.attr}-> ${triplet.dstAttr}")145}146```147148### Graph Transformations149150**mapVertices**: Transform vertex attributes151```scala { .api }152def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]153```154155```scala156// Add vertex degrees to attributes157val graphWithDegrees = graph.mapVertices { (id, attr) =>158(attr, graph.degrees.lookup(id).headOption.getOrElse(0))159}160161// Convert to upper case162val upperCaseGraph = graph.mapVertices { (id, name) =>163name.toUpperCase164}165```166167**mapEdges**: Transform edge attributes168```scala { .api }169def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]170def mapEdges[ED2: ClassTag](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2]171```172173```scala174// Add edge weights175val weightedGraph = graph.mapEdges { edge =>176edge.attr match {177case "friend" => 1.0178case "colleague" => 0.5179case _ => 0.1180}181}182183// Transform edge attributes using triplet info184val enhancedGraph = graph.mapTriplets { triplet =>185s"${triplet.srcAttr}-${triplet.attr}-${triplet.dstAttr}"186}187```188189### Structural Operations190191**reverse**: Reverse edge directions192```scala { .api }193def reverse: Graph[VD, ED]194```195196**subgraph**: Extract subgraph based on predicates197```scala { .api }198def subgraph(199epred: EdgeTriplet[VD, ED] => Boolean = (x => true),200vpred: (VertexId, VD) => Boolean = ((v, d) => true)201): Graph[VD, ED]202```203204```scala205// Reverse all edges206val reversedGraph = graph.reverse207208// Extract subgraph with only "friend" edges209val friendGraph = graph.subgraph(epred = _.attr == "friend")210211// Extract subgraph with specific vertices212val aliceBobGraph = graph.subgraph(213vpred = (id, attr) => attr == "Alice" || attr == "Bob"214)215216// Extract subgraph based on both vertices and edges217val specificSubgraph = graph.subgraph(218epred = triplet => triplet.srcAttr != "Charlie",219vpred = (id, attr) => attr.length > 3220)221```222223**groupEdges**: Merge parallel edges224```scala { .api }225def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]226```227228```scala229// Create graph with parallel edges230val parallelEdges: RDD[Edge[Int]] = sc.parallelize(Array(231Edge(1L, 2L, 1),232Edge(1L, 2L, 2), // Parallel edge233Edge(2L, 3L, 3)234))235236val parallelGraph = Graph.fromEdges(parallelEdges, "user")237238// Merge parallel edges by summing weights239val mergedGraph = parallelGraph.groupEdges(_ + _)240```241242## VertexRDD and EdgeRDD243244### VertexRDD245246Specialized RDD for vertices with efficient joins:247248```scala { .api }249abstract class VertexRDD[VD] extends RDD[(VertexId, VD)] {250def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]251def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]252def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]253def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]254def aggregateUsingIndex[VD2: ClassTag](messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]255}256```257258```scala259val degrees = graph.degrees260261// Transform vertex values262val transformedVertices = graph.vertices.mapValues(_.toUpperCase)263264// Join with additional data265val ages: RDD[(VertexId, Int)] = sc.parallelize(Array(266(1L, 25), (2L, 30), (3L, 35), (4L, 28)267))268269val verticesWithAges = graph.vertices.leftJoin(ages) { (id, name, ageOpt) =>270(name, ageOpt.getOrElse(0))271}272```273274### EdgeRDD275276Specialized RDD for edges:277278```scala { .api }279abstract class EdgeRDD[ED] extends RDD[Edge[ED]] {280def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]281def reverse: EdgeRDD[ED]282def innerJoin[ED2: ClassTag, ED3: ClassTag](other: RDD[(VertexId, ED2)])(f: (VertexId, ED, ED2) => ED3): EdgeRDD[ED3]283}284```285286## GraphOps - Advanced Operations287288GraphOps provides additional graph algorithms and utilities through implicit conversion:289290```scala { .api }291class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {292// Degree operations293def degrees: VertexRDD[Int]294def inDegrees: VertexRDD[Int]295def outDegrees: VertexRDD[Int]296297// Neighborhood operations298def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]299def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]300301// Messaging operations302def aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A, tripletFields: TripletFields = TripletFields.All): VertexRDD[A]303304// Pregel API305def pregel[A: ClassTag](initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A): Graph[VD, ED]306}307```308309### Degree Operations310311```scala312import org.apache.spark.graphx.GraphOps313314// Compute vertex degrees315val degrees = graph.degrees316val inDegrees = graph.inDegrees317val outDegrees = graph.outDegrees318319// Find vertices with highest in-degree320val maxInDegree = inDegrees.reduce { (a, b) =>321if (a._2 > b._2) a else b322}323println(s"Vertex ${maxInDegree._1} has highest in-degree: ${maxInDegree._2}")324325// Join degrees with vertex attributes326val verticesWithDegrees = graph.vertices.leftJoin(degrees) { (id, attr, deg) =>327(attr, deg.getOrElse(0))328}329```330331### Neighborhood Operations332333```scala334import org.apache.spark.graphx.EdgeDirection335336// Collect neighbors337val neighbors = graph.collectNeighbors(EdgeDirection.Out)338neighbors.collect().foreach { case (id, neighborArray) =>339println(s"Vertex $id has neighbors: ${neighborArray.mkString(", ")}")340}341342// Collect neighbor IDs only343val neighborIds = graph.collectNeighborIds(EdgeDirection.In)344```345346### Message Passing with aggregateMessages347348```scala { .api }349def aggregateMessages[A: ClassTag](350sendMsg: EdgeContext[VD, ED, A] => Unit,351mergeMsg: (A, A) => A,352tripletFields: TripletFields = TripletFields.All353): VertexRDD[A]354```355356```scala357import org.apache.spark.graphx.{EdgeContext, TripletFields}358359// Count neighbors360val neighborCount = graph.aggregateMessages[Int](361// Send message to each vertex362sendMsg = { edgeContext =>363edgeContext.sendToSrc(1)364edgeContext.sendToDst(1)365},366// Merge messages367mergeMsg = _ + _368)369370// Compute average neighbor age (assuming vertices have age attribute)371val ageGraph: Graph[Int, String] = Graph.fromEdges(edges, defaultValue = 25)372373val avgNeighborAge = ageGraph.aggregateMessages[Double](374sendMsg = { ctx =>375ctx.sendToSrc(ctx.dstAttr.toDouble)376ctx.sendToDst(ctx.srcAttr.toDouble)377},378mergeMsg = _ + _,379tripletFields = TripletFields.All380).mapValues { (id, totalAge) =>381val degree = ageGraph.degrees.lookup(id).head382totalAge / degree383}384```385386### Pregel API387388The Pregel API enables iterative graph computations:389390```scala { .api }391def pregel[A: ClassTag](392initialMsg: A,393maxIterations: Int = Int.MaxValue,394activeDirection: EdgeDirection = EdgeDirection.Either395)(396vprog: (VertexId, VD, A) => VD,397sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],398mergeMsg: (A, A) => A399): Graph[VD, ED]400```401402```scala403import org.apache.spark.graphx.EdgeDirection404405// Single Source Shortest Path using Pregel406def shortestPath(graph: Graph[Double, Double], sourceId: VertexId): Graph[Double, Double] = {407// Initialize distances (source = 0.0, others = Double.PositiveInfinity)408val initialGraph = graph.mapVertices { (id, _) =>409if (id == sourceId) 0.0 else Double.PositiveInfinity410}411412initialGraph.pregel(413initialMsg = Double.PositiveInfinity,414maxIterations = Int.MaxValue,415activeDirection = EdgeDirection.Out416)(417// Vertex program: update distance if received shorter path418vprog = { (id, dist, newDist) => math.min(dist, newDist) },419420// Send message: if vertex distance changed, notify neighbors421sendMsg = { triplet =>422if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {423Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))424} else {425Iterator.empty426}427},428429// Merge messages: take minimum distance430mergeMsg = (a, b) => math.min(a, b)431)432}433434// Usage435val sourceVertex = 1L436val distances = shortestPath(weightedGraph, sourceVertex)437```438439## Built-in Graph Algorithms440441GraphX includes implementations of common graph algorithms:442443### PageRank444445```scala { .api }446object PageRank {447def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]448def runUntilConvergence[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double]449}450```451452```scala453import org.apache.spark.graphx.lib.PageRank454455// Run PageRank for fixed iterations456val pageRanks = PageRank.run(graph, numIter = 10)457458// Run PageRank until convergence459val convergedRanks = PageRank.runUntilConvergence(graph, tol = 0.0001)460461// Get vertices with highest PageRank462val topVertices = pageRanks.vertices.top(3)(Ordering.by(_._2))463topVertices.foreach { case (id, rank) =>464println(s"Vertex $id: PageRank = $rank")465}466```467468### Connected Components469470```scala { .api }471object ConnectedComponents {472def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]473}474```475476```scala477import org.apache.spark.graphx.lib.ConnectedComponents478479val ccGraph = ConnectedComponents.run(graph)480481// Group vertices by connected component482val componentSizes = ccGraph.vertices483.map(_._2) // Extract component ID484.countByValue() // Count vertices per component485486componentSizes.foreach { case (componentId, size) =>487println(s"Component $componentId has $size vertices")488}489```490491### Triangle Counting492493```scala { .api }494object TriangleCount {495def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]496}497```498499```scala500import org.apache.spark.graphx.lib.TriangleCount501502// Count triangles (graph must be canonical - lower vertex ID as source)503val canonicalGraph = graph.convertToCanonicalEdges()504val triangleCounts = TriangleCount.run(canonicalGraph)505506// Find vertices involved in most triangles507val maxTriangles = triangleCounts.vertices.reduce { (a, b) =>508if (a._2 > b._2) a else b509}510println(s"Vertex ${maxTriangles._1} is in ${maxTriangles._2} triangles")511```512513### Strongly Connected Components514515```scala { .api }516object StronglyConnectedComponents {517def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED]518}519```520521```scala522import org.apache.spark.graphx.lib.StronglyConnectedComponents523524val sccGraph = StronglyConnectedComponents.run(graph, numIter = 10)525526// Find strongly connected components527val sccSizes = sccGraph.vertices528.map(_._2)529.countByValue()530531println(s"Found ${sccSizes.size} strongly connected components")532```533534## Graph Partitioning535536Control how graphs are partitioned across the cluster:537538```scala { .api }539object PartitionStrategy {540val EdgePartition1D: PartitionStrategy541val EdgePartition2D: PartitionStrategy542val RandomVertexCut: PartitionStrategy543val CanonicalRandomVertexCut: PartitionStrategy544}545```546547```scala548import org.apache.spark.graphx.{PartitionStrategy, Graph}549550// Create graph with specific partitioning strategy551val partitionedGraph = Graph(vertices, edges)552.partitionBy(PartitionStrategy.EdgePartition2D, 4)553554// Repartition existing graph555val repartitionedGraph = graph.partitionBy(PartitionStrategy.RandomVertexCut, 8)556```557558## Performance Optimization559560### Graph Caching561562```scala563// Cache graph for iterative algorithms564val cachedGraph = graph.cache()565566// Unpersist when done567cachedGraph.unpersist()568```569570### Efficient Graph Construction571572```scala573// For large graphs, construct more efficiently574val efficientGraph = Graph.fromEdges(edges, defaultVertexAttr = "default")575.partitionBy(PartitionStrategy.EdgePartition2D, numPartitions = 4)576.cache()577578// Materialize the graph579efficientGraph.vertices.count()580efficientGraph.edges.count()581```582583This comprehensive guide covers the complete GraphX API for building scalable graph processing applications in Apache Spark.