Apache Spark is a unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R.
—
GraphX is Apache Spark's API for graphs and graph-parallel computation. It extends the Spark RDD abstraction with a resilient distributed property graph where vertices and edges have properties.
Graph processing functionality is available through:
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
// Create vertices RDD
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
(1L, "Alice"),
(2L, "Bob"),
(3L, "Charlie"),
(4L, "David")
))
// Create edges RDD
val edges: RDD[Edge[String]] = sc.parallelize(Array(
Edge(1L, 2L, "friend"),
Edge(2L, 3L, "friend"),
Edge(3L, 4L, "colleague"),
Edge(4L, 1L, "friend")
))
// Create graph
val graph = Graph(vertices, edges)
// Basic graph operations
println(s"Number of vertices: ${graph.vertices.count()}")
println(s"Number of edges: ${graph.edges.count()}")
// Graph algorithms
val ranks = graph.pageRank(0.0001).vertices
val connectedComponents = graph.connectedComponents().vertices
// Join with original vertex names
val ranksByUsername = vertices.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
ranksByUsername.collect().foreach(println)The fundamental graph abstraction representing a property graph with typed vertex and edge properties.
abstract class Graph[VD: ClassTag, ED: ClassTag] extends Serializable {
// Graph structure
def vertices: VertexRDD[VD]
def edges: EdgeRDD[ED]
def triplets: RDD[EdgeTriplet[VD, ED]]
// Basic operations
def numEdges: Long
def numVertices: Long
def inDegrees: VertexRDD[Int]
def outDegrees: VertexRDD[Int]
def degrees: VertexRDD[Int]
// Transformations
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2, tripletFields: TripletFields): Graph[VD, ED2]
// Structural operations
def reverse: Graph[VD, ED]
def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true)): Graph[VD, ED]
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
// Join operations
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
(updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]
// Graph algorithms
def connectedComponents(): Graph[VertexId, ED]
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
def triangleCount(): Graph[Int, ED]
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def personalizedPageRank(src: VertexId, tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
// Aggregation
def aggregateMessages[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields = TripletFields.All): VertexRDD[A]
// Pregel API
def pregel[A: ClassTag](initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A): Graph[VD, ED]
// Persistence
def cache(): Graph[VD, ED]
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def unpersist(blocking: Boolean = true): Graph[VD, ED]
def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
// Checkpointing
def checkpoint(): Unit
def isCheckpointed: Boolean
def getCheckpointFiles: Seq[String]
}
object Graph {
def apply[VD: ClassTag, ED: ClassTag](vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD],
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def fromEdges[VD: ClassTag, ED: ClassTag](edges: RDD[Edge[ED]], defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
def fromEdgeTuples[VD: ClassTag](rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]
}A specialized RDD for representing vertices in a graph.
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)] {
// RDD operations optimized for vertices
def mapVertexPartitions[VD2: ClassTag](f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2]): VertexRDD[VD2]
// Join operations
def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])
(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def leftZipJoin[VD2: ClassTag, VD3: ClassTag](other: VertexRDD[VD2])
(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
// Aggregation
def aggregateUsingIndex[VD2: ClassTag](messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
// Set operations
def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]
def minus(other: VertexRDD[VD]): VertexRDD[VD]
// Conversion
def toRDD: RDD[(VertexId, VD)]
}A specialized RDD for representing edges in a graph.
abstract class EdgeRDD[ED] extends RDD[Edge[ED]] {
// RDD operations optimized for edges
def mapEdgePartitions[ED2: ClassTag, VD: ClassTag](f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD]): EdgeRDD[ED2]
// Join operations
def innerJoin[ED2: ClassTag, ED3: ClassTag](other: RDD[Edge[ED2]])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
// Conversion
def toRDD: RDD[Edge[ED]]
}Basic edge data structures.
case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED) extends Serializable
class EdgeTriplet[VD, ED] extends Edge[ED] {
def srcAttr: VD
def dstAttr: VD
def otherVertexAttr(vid: VertexId): VD
def otherVertexId(vid: VertexId): VertexId
def relativizeDirection(vid: VertexId): EdgeDirection
def toTuple: ((VertexId, VD), (VertexId, VD), ED)
}
type VertexId = Long
object EdgeDirection extends Enumeration {
type EdgeDirection = Value
val In, Out, Either, Both = Value
}GraphX provides implementations of common graph algorithms.
object PageRank {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
def runUntilConvergence[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
def runWithOptions[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
srcId: Option[VertexId] = None): Graph[Double, Double]
}Usage example:
val graph: Graph[String, String] = // ... create graph
val ranks = graph.pageRank(0.0001).vertices
val topRanks = ranks.top(10)(Ordering.by(_._2))object ConnectedComponents {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], maxIterations: Int): Graph[VertexId, ED]
}object StronglyConnectedComponents {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED]
}object TriangleCount {
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
}object LabelPropagation {
def run[ED: ClassTag](graph: Graph[Int, ED], maxSteps: Int): Graph[VertexId, ED]
}object ShortestPaths {
def run[ED: ClassTag](graph: Graph[_, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED]
type SPMap = Map[VertexId, Int]
}Utilities for loading graphs from various formats.
object GraphLoader {
def edgeListFile(sc: SparkContext, path: String, canonicalOrientation: Boolean = false,
numEdgePartitions: Int = -1, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[Int, Int]
}Usage example:
// Load graph from edge list file
val graph = GraphLoader.edgeListFile(sc, "path/to/edges.txt")
// Edge list file format: srcId dstId (one edge per line)
// Example content:
// 1 2
// 2 3
// 3 1Control how graph data is distributed across the cluster.
abstract class PartitionStrategy extends Serializable {
def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
}
object PartitionStrategy {
case object RandomVertexCut extends PartitionStrategy
case object EdgePartition1D extends PartitionStrategy
case object EdgePartition2D extends PartitionStrategy
case object CanonicalRandomVertexCut extends PartitionStrategy
def fromString(s: String): PartitionStrategy
}GraphX provides the aggregateMessages API for efficient message passing.
abstract class EdgeContext[VD, ED, A] {
def srcId: VertexId
def dstId: VertexId
def srcAttr: VD
def dstAttr: VD
def attr: ED
def sendToSrc(msg: A): Unit
def sendToDst(msg: A): Unit
def toEdgeTriplet: EdgeTriplet[VD, ED]
}
case class TripletFields(useSrc: Boolean = true, useDst: Boolean = true, useEdge: Boolean = true)
object TripletFields {
val None = TripletFields(false, false, false)
val EdgeOnly = TripletFields(false, false, true)
val Src = TripletFields(true, false, false)
val Dst = TripletFields(false, true, false)
val All = TripletFields(true, true, true)
}Usage example:
val graph: Graph[Double, Double] = // ... create graph
// Compute sum of neighbor values
val neighborSum = graph.aggregateMessages[Double](
triplet => {
// Send source attribute to destination
triplet.sendToDst(triplet.srcAttr)
// Send destination attribute to source
triplet.sendToSrc(triplet.dstAttr)
},
// Merge function
(a, b) => a + b
)
// Update vertex attributes with neighbor sums
val newGraph = graph.joinVertices(neighborSum) { (vid, oldAttr, msgSum) =>
msgSum.getOrElse(0.0)
}The Pregel API is a vertex-centric approach to graph computation.
object Pregel {
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](graph: Graph[VD, ED], initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A): Graph[VD, ED]
}Usage example:
// Single-source shortest path using Pregel
def shortestPaths[ED: ClassTag](graph: Graph[Double, ED], sourceId: VertexId): Graph[Double, ED] = {
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
// Vertex program: update vertex value with minimum distance
(id, dist, newDist) => math.min(dist, newDist),
// Send message: send distance + edge weight to neighbors
triplet => {
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
// Merge messages: take minimum distance
(a, b) => math.min(a, b)
)
sssp
}Additional utilities for graph processing.
object GraphGenerators {
def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int = 0, mu: Double = 4.0, sigma: Double = 1.3,
seed: Long = -1): Graph[Long, Int]
def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int, a: Double = 0.45, b: Double = 0.15,
c: Double = 0.15, d: Double = 0.25, seed: Long = -1, numEParts: Int = 0): Graph[Int, Int]
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int]
def gridGraph(sc: SparkContext, height: Int, width: Int): Graph[(Int, Int), Double]
}Usage example:
// Generate a synthetic graph
val syntheticGraph = GraphGenerators.logNormalGraph(sc, numVertices = 1000)
// Run PageRank on synthetic graph
val ranks = syntheticGraph.pageRank(0.001).vertices
val topVertices = ranks.top(10)(Ordering.by(_._2))Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-parent-2-13