CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-parent-2-13

Apache Spark is a unified analytics engine for large-scale data processing with high-level APIs in Scala, Java, Python, and R.

Pending
Overview
Eval results
Files

graph-processing.mddocs/

Graph Processing

GraphX is Apache Spark's API for graphs and graph-parallel computation. It extends the Spark RDD abstraction with a resilient distributed property graph where vertices and edges have properties.

Package Information

Graph processing functionality is available through:

import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._

Basic Usage

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

// Create vertices RDD
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
  (1L, "Alice"),
  (2L, "Bob"),
  (3L, "Charlie"),
  (4L, "David")
))

// Create edges RDD
val edges: RDD[Edge[String]] = sc.parallelize(Array(
  Edge(1L, 2L, "friend"),
  Edge(2L, 3L, "friend"),
  Edge(3L, 4L, "colleague"),
  Edge(4L, 1L, "friend")
))

// Create graph
val graph = Graph(vertices, edges)

// Basic graph operations
println(s"Number of vertices: ${graph.vertices.count()}")
println(s"Number of edges: ${graph.edges.count()}")

// Graph algorithms
val ranks = graph.pageRank(0.0001).vertices
val connectedComponents = graph.connectedComponents().vertices

// Join with original vertex names
val ranksByUsername = vertices.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}

ranksByUsername.collect().foreach(println)

Capabilities

Core Graph Types

Graph

The fundamental graph abstraction representing a property graph with typed vertex and edge properties.

abstract class Graph[VD: ClassTag, ED: ClassTag] extends Serializable {
  // Graph structure
  def vertices: VertexRDD[VD]
  def edges: EdgeRDD[ED]
  def triplets: RDD[EdgeTriplet[VD, ED]]
  
  // Basic operations
  def numEdges: Long
  def numVertices: Long
  def inDegrees: VertexRDD[Int]
  def outDegrees: VertexRDD[Int]
  def degrees: VertexRDD[Int]
  
  // Transformations
  def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2, tripletFields: TripletFields): Graph[VD, ED2]
  
  // Structural operations
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
               vpred: (VertexId, VD) => Boolean = ((v, d) => true)): Graph[VD, ED]
  def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
  
  // Join operations
  def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
    (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]
  
  // Graph algorithms
  def connectedComponents(): Graph[VertexId, ED]
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
  def triangleCount(): Graph[Int, ED]
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def personalizedPageRank(src: VertexId, tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
  
  // Aggregation
  def aggregateMessages[A: ClassTag](
    sendMsg: EdgeContext[VD, ED, A] => Unit,
    mergeMsg: (A, A) => A,
    tripletFields: TripletFields = TripletFields.All): VertexRDD[A]
    
  // Pregel API
  def pregel[A: ClassTag](initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either)
    (vprog: (VertexId, VD, A) => VD,
     sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
     mergeMsg: (A, A) => A): Graph[VD, ED]
     
  // Persistence
  def cache(): Graph[VD, ED]
  def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  def unpersist(blocking: Boolean = true): Graph[VD, ED]
  def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]
  
  // Checkpointing
  def checkpoint(): Unit
  def isCheckpointed: Boolean
  def getCheckpointFiles: Seq[String]
}

object Graph {
  def apply[VD: ClassTag, ED: ClassTag](vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]],
                                        defaultVertexAttr: VD = null.asInstanceOf[VD],
                                        edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
                                        vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  
  def fromEdges[VD: ClassTag, ED: ClassTag](edges: RDD[Edge[ED]], defaultValue: VD,
                                            uniqueEdges: Option[PartitionStrategy] = None,
                                            edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
                                            vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
  
  def fromEdgeTuples[VD: ClassTag](rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD,
                                   uniqueEdges: Option[PartitionStrategy] = None,
                                   edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
                                   vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]
}

VertexRDD

A specialized RDD for representing vertices in a graph.

abstract class VertexRDD[VD] extends RDD[(VertexId, VD)] {
  // RDD operations optimized for vertices
  def mapVertexPartitions[VD2: ClassTag](f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2]): VertexRDD[VD2]
  
  // Join operations
  def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])
    (f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
  def leftZipJoin[VD2: ClassTag, VD3: ClassTag](other: VertexRDD[VD2])
    (f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]
  def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
    (f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
  def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])
    (f: (VertexId, VD, U) => VD2): VertexRDD[VD2]
    
  // Aggregation
  def aggregateUsingIndex[VD2: ClassTag](messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]
  
  // Set operations
  def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]
  def minus(other: VertexRDD[VD]): VertexRDD[VD]
  
  // Conversion
  def toRDD: RDD[(VertexId, VD)]
}

EdgeRDD

A specialized RDD for representing edges in a graph.

abstract class EdgeRDD[ED] extends RDD[Edge[ED]] {
  // RDD operations optimized for edges
  def mapEdgePartitions[ED2: ClassTag, VD: ClassTag](f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD]): EdgeRDD[ED2]
  
  // Join operations
  def innerJoin[ED2: ClassTag, ED3: ClassTag](other: RDD[Edge[ED2]])
    (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]
    
  // Conversion
  def toRDD: RDD[Edge[ED]]
}

Edge and EdgeTriplet

Basic edge data structures.

case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED) extends Serializable

class EdgeTriplet[VD, ED] extends Edge[ED] {
  def srcAttr: VD
  def dstAttr: VD
  def otherVertexAttr(vid: VertexId): VD
  def otherVertexId(vid: VertexId): VertexId
  def relativizeDirection(vid: VertexId): EdgeDirection
  def toTuple: ((VertexId, VD), (VertexId, VD), ED)
}

type VertexId = Long

object EdgeDirection extends Enumeration {
  type EdgeDirection = Value
  val In, Out, Either, Both = Value
}

Graph Algorithms

GraphX provides implementations of common graph algorithms.

PageRank

object PageRank {
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
  
  def runUntilConvergence[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  
  def runWithOptions[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,
                                                 srcId: Option[VertexId] = None): Graph[Double, Double]
}

Usage example:

val graph: Graph[String, String] = // ... create graph
val ranks = graph.pageRank(0.0001).vertices
val topRanks = ranks.top(10)(Ordering.by(_._2))

Connected Components

object ConnectedComponents {
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], maxIterations: Int): Graph[VertexId, ED]
}

Strongly Connected Components

object StronglyConnectedComponents {
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED]
}

Triangle Count

object TriangleCount {
  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
  def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]
}

Label Propagation

object LabelPropagation {
  def run[ED: ClassTag](graph: Graph[Int, ED], maxSteps: Int): Graph[VertexId, ED]
}

Shortest Paths

object ShortestPaths {
  def run[ED: ClassTag](graph: Graph[_, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED]
  
  type SPMap = Map[VertexId, Int]
}

Graph Construction and Loading

GraphLoader

Utilities for loading graphs from various formats.

object GraphLoader {
  def edgeListFile(sc: SparkContext, path: String, canonicalOrientation: Boolean = false,
                   numEdgePartitions: Int = -1, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
                   vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[Int, Int]
}

Usage example:

// Load graph from edge list file
val graph = GraphLoader.edgeListFile(sc, "path/to/edges.txt")

// Edge list file format: srcId dstId (one edge per line)
// Example content:
// 1 2
// 2 3
// 3 1

Graph Partitioning

Control how graph data is distributed across the cluster.

abstract class PartitionStrategy extends Serializable {
  def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
}

object PartitionStrategy {
  case object RandomVertexCut extends PartitionStrategy
  case object EdgePartition1D extends PartitionStrategy  
  case object EdgePartition2D extends PartitionStrategy
  case object CanonicalRandomVertexCut extends PartitionStrategy
  
  def fromString(s: String): PartitionStrategy
}

Message Passing

GraphX provides the aggregateMessages API for efficient message passing.

abstract class EdgeContext[VD, ED, A] {
  def srcId: VertexId
  def dstId: VertexId
  def srcAttr: VD
  def dstAttr: VD
  def attr: ED
  def sendToSrc(msg: A): Unit
  def sendToDst(msg: A): Unit
  def toEdgeTriplet: EdgeTriplet[VD, ED]
}

case class TripletFields(useSrc: Boolean = true, useDst: Boolean = true, useEdge: Boolean = true)

object TripletFields {
  val None = TripletFields(false, false, false)
  val EdgeOnly = TripletFields(false, false, true)
  val Src = TripletFields(true, false, false)
  val Dst = TripletFields(false, true, false)
  val All = TripletFields(true, true, true)
}

Usage example:

val graph: Graph[Double, Double] = // ... create graph

// Compute sum of neighbor values
val neighborSum = graph.aggregateMessages[Double](
  triplet => {
    // Send source attribute to destination
    triplet.sendToDst(triplet.srcAttr)
    // Send destination attribute to source  
    triplet.sendToSrc(triplet.dstAttr)
  },
  // Merge function
  (a, b) => a + b
)

// Update vertex attributes with neighbor sums
val newGraph = graph.joinVertices(neighborSum) { (vid, oldAttr, msgSum) =>
  msgSum.getOrElse(0.0)
}

Pregel API

The Pregel API is a vertex-centric approach to graph computation.

object Pregel {
  def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](graph: Graph[VD, ED], initialMsg: A,
                                                     maxIterations: Int = Int.MaxValue,
                                                     activeDirection: EdgeDirection = EdgeDirection.Either)
                                                    (vprog: (VertexId, VD, A) => VD,
                                                     sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
                                                     mergeMsg: (A, A) => A): Graph[VD, ED]
}

Usage example:

// Single-source shortest path using Pregel
def shortestPaths[ED: ClassTag](graph: Graph[Double, ED], sourceId: VertexId): Graph[Double, ED] = {
  val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
  
  val sssp = initialGraph.pregel(Double.PositiveInfinity)(
    // Vertex program: update vertex value with minimum distance
    (id, dist, newDist) => math.min(dist, newDist),
    
    // Send message: send distance + edge weight to neighbors
    triplet => {
      if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
        Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
      } else {
        Iterator.empty
      }
    },
    
    // Merge messages: take minimum distance
    (a, b) => math.min(a, b)
  )
  
  sssp
}

GraphX Utilities

Additional utilities for graph processing.

object GraphGenerators {
  def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int = 0, mu: Double = 4.0, sigma: Double = 1.3,
                     seed: Long = -1): Graph[Long, Int]
  
  def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int, a: Double = 0.45, b: Double = 0.15,
                c: Double = 0.15, d: Double = 0.25, seed: Long = -1, numEParts: Int = 0): Graph[Int, Int]
                
  def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int]
  
  def gridGraph(sc: SparkContext, height: Int, width: Int): Graph[(Int, Int), Double]
}

Usage example:

// Generate a synthetic graph
val syntheticGraph = GraphGenerators.logNormalGraph(sc, numVertices = 1000)

// Run PageRank on synthetic graph
val ranks = syntheticGraph.pageRank(0.001).vertices
val topVertices = ranks.top(10)(Ordering.by(_._2))

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-parent-2-13

docs

core-engine.md

graph-processing.md

index.md

machine-learning.md

sql-dataframes.md

stream-processing.md

tile.json