Apache Spark GraphX is a graph processing library that provides fundamental graph operations and algorithms for Apache Spark
npx @tessl/cli install tessl/maven-org-apache-spark--spark-graphx-2-12@2.4.0Apache Spark GraphX is a distributed graph processing library built on top of Apache Spark. It provides a comprehensive set of graph algorithms and operations for large-scale graph analytics with vertices, edges, and property graphs, along with core operations optimized for distributed computing environments.
pom.xml or include in Spark application classpathMaven Dependency:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.12</artifactId>
<version>2.4.8</version>
</dependency>import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._
import org.apache.spark.rdd.RDDFor specific components:
import org.apache.spark.graphx.{Graph, VertexId, Edge, EdgeDirection}
import org.apache.spark.graphx.lib.{PageRank, ConnectedComponents, TriangleCount}import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
// Create vertices RDD: (VertexId, VertexAttribute)
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
(1L, "Alice"), (2L, "Bob"), (3L, "Charlie")
))
// Create edges RDD
val edges: RDD[Edge[String]] = sc.parallelize(Array(
Edge(1L, 2L, "friend"),
Edge(2L, 3L, "colleague"),
Edge(3L, 1L, "neighbor")
))
// Create graph
val graph = Graph(vertices, edges)
// Basic graph operations
println(s"Graph has ${graph.numVertices} vertices and ${graph.numEdges} edges")
// Run PageRank algorithm
val pageRankGraph = graph.pageRank(0.0001)
// Get vertex degrees
val inDegrees = graph.inDegrees
val outDegrees = graph.outDegreesGraphX is built around several key components:
Graph[VD, ED] trait representing immutable property graphs with vertex data type VD and edge data type EDVertexRDD and EdgeRDD with optimized storage and fast join operationsaggregateMessages and Pregel APIFundamental graph abstractions and operations for creating, transforming, and querying property graphs with type-safe vertex and edge attributes.
abstract class Graph[VD: ClassTag, ED: ClassTag] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
def subgraph(epred: EdgeTriplet[VD, ED] => Boolean, vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
def aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit, mergeMsg: (A, A) => A): VertexRDD[A]
}
object Graph {
def apply[VD: ClassTag, ED: ClassTag](vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]]): Graph[VD, ED]
def fromEdges[VD: ClassTag, ED: ClassTag](edges: RDD[Edge[ED]], defaultValue: VD): Graph[VD, ED]
def fromEdgeTuples[VD: ClassTag](rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD): Graph[VD, Int]
}Graph analytics operations for computing structural properties, degrees, neighborhoods, and graph metrics.
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
lazy val numEdges: Long
lazy val numVertices: Long
lazy val inDegrees: VertexRDD[Int]
lazy val outDegrees: VertexRDD[Int]
lazy val degrees: VertexRDD[Int]
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
}Specialized RDD implementations optimized for graph operations with fast joins and efficient storage.
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)] {
def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
}
abstract class EdgeRDD[ED] extends RDD[Edge[ED]] {
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
def reverse: EdgeRDD[ED]
}Pre-implemented graph algorithms for common analytics tasks including centrality, community detection, and path finding.
// PageRank Algorithm
object PageRank {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
def runUntilConvergence[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
}
// Connected Components
object ConnectedComponents {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]
}
// Triangle Counting
object TriangleCount {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
}Bulk synchronous message passing framework for implementing custom graph algorithms using vertex-centric programming model.
// Core message passing
def aggregateMessages[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A
): VertexRDD[A]
// Pregel API
object Pregel {
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](
graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either
)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A
): Graph[VD, ED]
}Utilities for loading graphs from files, partitioning strategies, and configuration options.
object GraphLoader {
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
numEdgePartitions: Int = -1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): Graph[Int, Int]
}
trait PartitionStrategy {
def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
}Core type definitions used throughout GraphX:
// Package-level type aliases
type VertexId = Long
type PartitionID = Int
// Edge types
case class Edge[ED](var srcId: VertexId, var dstId: VertexId, var attr: ED) {
def otherVertexId(vid: VertexId): VertexId
def relativeDirection(vid: VertexId): EdgeDirection
}
class EdgeTriplet[VD, ED] extends Edge[ED] {
var srcAttr: VD
var dstAttr: VD
def otherVertexAttr(vid: VertexId): VD
}
// Edge direction enumeration
class EdgeDirection private (private val name: String) extends Serializable {
def reverse: EdgeDirection
override def toString: String
override def equals(o: Any): Boolean
override def hashCode: Int
}
object EdgeDirection {
final val In: EdgeDirection
final val Out: EdgeDirection
final val Either: EdgeDirection
final val Both: EdgeDirection
}