Apache Spark - Unified analytics engine for large-scale data processing
—
GraphX provides APIs for graphs and graph-parallel computation with fundamental operators like subgraph, joinVertices, and aggregateMessages, plus optimized variants of Pregel and graph algorithms including PageRank, connected components, and triangle counting.
Main graph abstraction representing a directed multigraph with user-defined vertex and edge attributes.
/**
* Main graph abstraction with vertex and edge attributes
* @tparam VD vertex attribute type
* @tparam ED edge attribute type
*/
abstract class Graph[VD: ClassTag, ED: ClassTag] {
/** Graph vertices with attributes */
val vertices: VertexRDD[VD]
/** Graph edges with attributes */
val edges: EdgeRDD[ED]
/** Edge triplets (src vertex, edge, dst vertex) */
val triplets: RDD[EdgeTriplet[VD, ED]]
/** Transform vertex attributes */
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
/** Transform edge attributes */
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
/** Transform edge attributes using triplet information */
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
/** Reverse edge directions */
def reverse: Graph[VD, ED]
/** Extract subgraph based on edge and vertex predicates */
def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true)
): Graph[VD, ED]
/** Join vertices with external data */
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
/** Update vertex attributes using aggregated messages from neighbors */
def aggregateMessages[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields = TripletFields.All
): VertexRDD[A]
/** Number of vertices */
def numVertices: Long
/** Number of edges */
def numEdges: Long
/** In-degrees of vertices */
def inDegrees: VertexRDD[Int]
/** Out-degrees of vertices */
def outDegrees: VertexRDD[Int]
/** Total degrees of vertices */
def degrees: VertexRDD[Int]
/** Persist graph in memory/disk */
def persist(newLevel: StorageLevel): Graph[VD, ED]
/** Cache graph in memory */
def cache(): Graph[VD, ED]
/** Remove graph from cache */
def unpersist(blocking: Boolean = false): Graph[VD, ED]
}Usage Examples:
import org.apache.spark.graphx._
// Create graph from vertex and edge RDDs
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
(1L, "Alice"), (2L, "Bob"), (3L, "Charlie"), (4L, "David")
))
val edges: RDD[Edge[String]] = sc.parallelize(Array(
Edge(1L, 2L, "friend"),
Edge(2L, 3L, "follow"),
Edge(3L, 4L, "friend")
))
val graph = Graph(vertices, edges)
// Transform vertices
val upperGraph = graph.mapVertices((id, name) => name.toUpperCase)
// Filter subgraph
val youngUsers = graph.subgraph(vpred = (id, name) => name.length > 3)
// Aggregate messages (compute degrees manually)
val degrees = graph.aggregateMessages[Int](
triplet => {
triplet.sendToSrc(1)
triplet.sendToDst(1)
},
(a, b) => a + b
)Factory methods and utilities for creating graphs.
/**
* Graph object with factory methods
*/
object Graph {
/** Create graph from vertex and edge RDDs */
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD
): Graph[VD, ED]
/** Create graph from edges with default vertex attributes */
def fromEdges[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultValue: VD
): Graph[VD, ED]
/** Create graph from edge tuples */
def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None
): Graph[VD, Int]
}
/**
* GraphLoader provides utilities for loading graphs from files
*/
object GraphLoader {
/** Load graph from edge list file */
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
numEdgePartitions: Int = -1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): Graph[Int, Int]
}Usage Examples:
// Load from edge list file
val graph = GraphLoader.edgeListFile(sc, "hdfs://path/to/edges.txt")
// Create from edge tuples
val edgeTuples = sc.parallelize(Array((1L, 2L), (2L, 3L), (3L, 1L)))
val tupleGraph = Graph.fromEdgeTuples(edgeTuples, "defaultVertex")
// Create from edges only
val edgeList = sc.parallelize(Array(
Edge(1L, 2L, 1.0),
Edge(2L, 3L, 2.0)
))
val edgeGraph = Graph.fromEdges(edgeList, "missing")Fundamental types used in graph processing.
/** Vertex identifier type */
type VertexId = Long
/**
* Edge with source, destination, and attribute
*/
case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED)
/**
* Edge with source and destination vertex attributes
*/
class EdgeTriplet[VD, ED] extends Edge[ED] {
/** Source vertex attribute */
var srcAttr: VD = _
/** Destination vertex attribute */
var dstAttr: VD = _
/** Set source vertex attribute */
def set(other: Edge[ED], srcAttr: VD, dstAttr: VD): EdgeTriplet[VD, ED]
}
/**
* Context for sending messages in aggregateMessages
*/
abstract class EdgeContext[VD, ED, A] {
/** Source vertex ID */
def srcId: VertexId
/** Destination vertex ID */
def dstId: VertexId
/** Source vertex attribute */
def srcAttr: VD
/** Destination vertex attribute */
def dstAttr: VD
/** Edge attribute */
def attr: ED
/** Send message to source vertex */
def sendToSrc(msg: A): Unit
/** Send message to destination vertex */
def sendToDst(msg: A): Unit
}Specialized RDD for vertices with efficient joins and graph operations.
/**
* RDD of vertices with efficient joins
*/
abstract class VertexRDD[VD: ClassTag] extends RDD[(VertexId, VD)] {
/** Filter vertices by predicate */
def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
def filter(pred: (VertexId, VD) => Boolean): VertexRDD[VD]
/** Transform vertex values */
def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
/** Vertices in this RDD but not in other */
def diff(other: VertexRDD[VD]): VertexRDD[VD]
/** Left join with another RDD */
def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(
f: (VertexId, VD, Option[VD2]) => VD3
): VertexRDD[VD3]
/** Inner join with another RDD */
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(
f: (VertexId, VD, U) => VD2
): VertexRDD[VD2]
/** Aggregate values by vertex ID */
def aggregateUsingIndex[VD2: ClassTag](
messages: RDD[(VertexId, VD2)],
reduceFunc: (VD2, VD2) => VD2
): VertexRDD[VD2]
}Specialized RDD for edges with graph-specific optimizations.
/**
* RDD of edges with graph-specific optimizations
*/
abstract class EdgeRDD[ED: ClassTag] extends RDD[Edge[ED]] {
/** Transform edge values */
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
/** Reverse edge directions */
def reverse: EdgeRDD[ED]
/** Filter edges using triplet information */
def filter(pred: EdgeTriplet[_, ED] => Boolean): EdgeRDD[ED]
/** Join with vertex attributes to create triplets */
def innerJoin[VD: ClassTag, ED2: ClassTag](other: VertexRDD[VD])(
f: (VertexId, VertexId, ED, VD, VD) => ED2
): EdgeRDD[ED2]
}Pre-implemented graph algorithms available through GraphOps.
/**
* Additional operations available on Graph through implicit conversion
*/
implicit class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
/** Run PageRank algorithm */
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
/** Run static PageRank for fixed number of iterations */
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
/** Find connected components */
def connectedComponents(): Graph[VertexId, ED]
/** Count triangles passing through each vertex */
def triangleCount(): Graph[Int, ED]
/** Find strongly connected components */
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
/** Collect neighbor IDs for each vertex */
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
/** Collect neighbor attributes for each vertex */
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
}
/**
* Edge directions for neighbor collection
*/
object EdgeDirection extends Enumeration {
val In, Out, Either, Both = Value
}Usage Examples:
import org.apache.spark.graphx.lib._
// PageRank
val pageRankGraph = graph.pageRank(0.0001, 0.15)
val pageRanks = pageRankGraph.vertices.collect()
// Connected Components
val ccGraph = graph.connectedComponents()
val components = ccGraph.vertices.collect()
// Triangle Count
val triangleCountGraph = graph.triangleCount()
val triangleCounts = triangleCountGraph.vertices.collect()
// Collect neighbors
val neighbors = graph.collectNeighborIds(EdgeDirection.Out)Pregel-style bulk-synchronous message-passing abstraction.
/**
* Pregel-style computation
*/
object Pregel {
/** Run Pregel computation */
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](
graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either
)(
vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A
): Graph[VD, ED]
}Usage Examples:
// Single Source Shortest Path using Pregel
def shortestPaths(graph: Graph[Double, Double], sourceId: VertexId): Graph[Double, Double] = {
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity
)
Pregel(initialGraph, Double.PositiveInfinity)(
// Vertex program: update vertex value with minimum distance
(id, dist, newDist) => math.min(dist, newDist),
// Send message: send distance + edge weight to neighbors
triplet => {
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
// Merge messages: take minimum
(a, b) => math.min(a, b)
)
}Strategies for partitioning graphs across cluster nodes.
/**
* Partitioning strategies for graph distribution
*/
object PartitionStrategy extends Enumeration {
/** Randomly assign edges to partitions */
val RandomVertexCut = Value
/** Assign edges based on source vertex hash */
val EdgePartition1D = Value
/** Two-dimensional partitioning */
val EdgePartition2D = Value
/** Canonical random vertex cut */
val CanonicalRandomVertexCut = Value
}
/**
* Partition graphs efficiently
*/
implicit class GraphPartitioning[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
/** Repartition graph using specified strategy */
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]
}Loading and saving graphs from various formats.
/**
* Save graph to various formats
*/
implicit class GraphWriter[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
/** Save vertices to text file */
def saveVerticesAsTextFile(path: String): Unit
/** Save edges to text file */
def saveEdgesAsTextFile(path: String): Unit
}Usage Examples:
// Partition graph
val partitionedGraph = graph
.partitionBy(PartitionStrategy.EdgePartition2D, numPartitions = 4)
// Save graph
graph.vertices.saveAsTextFile("hdfs://path/to/vertices")
graph.edges.saveAsTextFile("hdfs://path/to/edges")
// Complex graph analysis pipeline
val socialGraph = Graph.fromEdgeTuples(friendships, "User")
val pageRanks = socialGraph
.pageRank(0.0001)
.vertices
.sortBy(_._2, ascending = false)
val communities = socialGraph
.connectedComponents()
.vertices
.map { case (userId, componentId) => (componentId, userId) }
.groupByKey()
.collect()Common GraphX exceptions:
IllegalArgumentException - Invalid graph construction parametersSparkException - General Spark execution errors during graph operationsClassCastException - Type mismatches in vertex/edge attributesInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-parent-2-12