CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-parent-2-12

Apache Spark - Unified analytics engine for large-scale data processing

Pending
Overview
Eval results
Files

graphx.mddocs/

Graph Processing

GraphX provides APIs for graphs and graph-parallel computation with fundamental operators like subgraph, joinVertices, and aggregateMessages, plus optimized variants of Pregel and graph algorithms including PageRank, connected components, and triangle counting.

Capabilities

Graph[VD, ED]

Main graph abstraction representing a directed multigraph with user-defined vertex and edge attributes.

/**
 * Main graph abstraction with vertex and edge attributes
 * @tparam VD vertex attribute type
 * @tparam ED edge attribute type
 */
abstract class Graph[VD: ClassTag, ED: ClassTag] {
  /** Graph vertices with attributes */
  val vertices: VertexRDD[VD]
  /** Graph edges with attributes */
  val edges: EdgeRDD[ED]
  /** Edge triplets (src vertex, edge, dst vertex) */
  val triplets: RDD[EdgeTriplet[VD, ED]]
  
  /** Transform vertex attributes */
  def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  /** Transform edge attributes */
  def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
  /** Transform edge attributes using triplet information */
  def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
  
  /** Reverse edge directions */
  def reverse: Graph[VD, ED]
  /** Extract subgraph based on edge and vertex predicates */
  def subgraph(
    epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
    vpred: (VertexId, VD) => Boolean = ((v, d) => true)
  ): Graph[VD, ED]
  
  /** Join vertices with external data */
  def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]
  /** Update vertex attributes using aggregated messages from neighbors */
  def aggregateMessages[A: ClassTag](
    sendMsg: EdgeContext[VD, ED, A] => Unit,
    mergeMsg: (A, A) => A,
    tripletFields: TripletFields = TripletFields.All
  ): VertexRDD[A]
  
  /** Number of vertices */
  def numVertices: Long
  /** Number of edges */
  def numEdges: Long
  /** In-degrees of vertices */
  def inDegrees: VertexRDD[Int]
  /** Out-degrees of vertices */
  def outDegrees: VertexRDD[Int]
  /** Total degrees of vertices */
  def degrees: VertexRDD[Int]
  
  /** Persist graph in memory/disk */
  def persist(newLevel: StorageLevel): Graph[VD, ED]
  /** Cache graph in memory */
  def cache(): Graph[VD, ED]
  /** Remove graph from cache */
  def unpersist(blocking: Boolean = false): Graph[VD, ED]
}

Usage Examples:

import org.apache.spark.graphx._

// Create graph from vertex and edge RDDs
val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(
  (1L, "Alice"), (2L, "Bob"), (3L, "Charlie"), (4L, "David")
))

val edges: RDD[Edge[String]] = sc.parallelize(Array(
  Edge(1L, 2L, "friend"),
  Edge(2L, 3L, "follow"),
  Edge(3L, 4L, "friend")
))

val graph = Graph(vertices, edges)

// Transform vertices
val upperGraph = graph.mapVertices((id, name) => name.toUpperCase)

// Filter subgraph
val youngUsers = graph.subgraph(vpred = (id, name) => name.length > 3)

// Aggregate messages (compute degrees manually)
val degrees = graph.aggregateMessages[Int](
  triplet => {
    triplet.sendToSrc(1)
    triplet.sendToDst(1)
  },
  (a, b) => a + b
)

Graph Construction

Factory methods and utilities for creating graphs.

/**
 * Graph object with factory methods
 */
object Graph {
  /** Create graph from vertex and edge RDDs */
  def apply[VD: ClassTag, ED: ClassTag](
    vertices: RDD[(VertexId, VD)],
    edges: RDD[Edge[ED]],
    defaultVertexAttr: VD
  ): Graph[VD, ED]
  
  /** Create graph from edges with default vertex attributes */
  def fromEdges[VD: ClassTag, ED: ClassTag](
    edges: RDD[Edge[ED]],
    defaultValue: VD
  ): Graph[VD, ED]
  
  /** Create graph from edge tuples */
  def fromEdgeTuples[VD: ClassTag](
    rawEdges: RDD[(VertexId, VertexId)],
    defaultValue: VD,
    uniqueEdges: Option[PartitionStrategy] = None
  ): Graph[VD, Int]
}

/**
 * GraphLoader provides utilities for loading graphs from files
 */
object GraphLoader {
  /** Load graph from edge list file */
  def edgeListFile(
    sc: SparkContext,
    path: String,
    canonicalOrientation: Boolean = false,
    numEdgePartitions: Int = -1,
    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
  ): Graph[Int, Int]
}

Usage Examples:

// Load from edge list file
val graph = GraphLoader.edgeListFile(sc, "hdfs://path/to/edges.txt")

// Create from edge tuples
val edgeTuples = sc.parallelize(Array((1L, 2L), (2L, 3L), (3L, 1L)))
val tupleGraph = Graph.fromEdgeTuples(edgeTuples, "defaultVertex")

// Create from edges only
val edgeList = sc.parallelize(Array(
  Edge(1L, 2L, 1.0),
  Edge(2L, 3L, 2.0)
))
val edgeGraph = Graph.fromEdges(edgeList, "missing")

Core Types

Fundamental types used in graph processing.

/** Vertex identifier type */
type VertexId = Long

/**
 * Edge with source, destination, and attribute
 */
case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED)

/**
 * Edge with source and destination vertex attributes
 */
class EdgeTriplet[VD, ED] extends Edge[ED] {
  /** Source vertex attribute */
  var srcAttr: VD = _
  /** Destination vertex attribute */
  var dstAttr: VD = _
  
  /** Set source vertex attribute */
  def set(other: Edge[ED], srcAttr: VD, dstAttr: VD): EdgeTriplet[VD, ED]
}

/**
 * Context for sending messages in aggregateMessages
 */
abstract class EdgeContext[VD, ED, A] {
  /** Source vertex ID */
  def srcId: VertexId
  /** Destination vertex ID */
  def dstId: VertexId
  /** Source vertex attribute */
  def srcAttr: VD
  /** Destination vertex attribute */
  def dstAttr: VD
  /** Edge attribute */
  def attr: ED
  
  /** Send message to source vertex */
  def sendToSrc(msg: A): Unit
  /** Send message to destination vertex */
  def sendToDst(msg: A): Unit
}

VertexRDD[VD]

Specialized RDD for vertices with efficient joins and graph operations.

/**
 * RDD of vertices with efficient joins
 */
abstract class VertexRDD[VD: ClassTag] extends RDD[(VertexId, VD)] {
  /** Filter vertices by predicate */
  def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]
  def filter(pred: (VertexId, VD) => Boolean): VertexRDD[VD]
  
  /** Transform vertex values */
  def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
  def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
  
  /** Vertices in this RDD but not in other */
  def diff(other: VertexRDD[VD]): VertexRDD[VD]
  
  /** Left join with another RDD */
  def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(
    f: (VertexId, VD, Option[VD2]) => VD3
  ): VertexRDD[VD3]
  
  /** Inner join with another RDD */
  def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(
    f: (VertexId, VD, U) => VD2
  ): VertexRDD[VD2]
  
  /** Aggregate values by vertex ID */
  def aggregateUsingIndex[VD2: ClassTag](
    messages: RDD[(VertexId, VD2)],
    reduceFunc: (VD2, VD2) => VD2
  ): VertexRDD[VD2]
}

EdgeRDD[ED]

Specialized RDD for edges with graph-specific optimizations.

/**
 * RDD of edges with graph-specific optimizations
 */
abstract class EdgeRDD[ED: ClassTag] extends RDD[Edge[ED]] {
  /** Transform edge values */
  def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
  
  /** Reverse edge directions */
  def reverse: EdgeRDD[ED]
  
  /** Filter edges using triplet information */
  def filter(pred: EdgeTriplet[_, ED] => Boolean): EdgeRDD[ED]
  
  /** Join with vertex attributes to create triplets */
  def innerJoin[VD: ClassTag, ED2: ClassTag](other: VertexRDD[VD])(
    f: (VertexId, VertexId, ED, VD, VD) => ED2
  ): EdgeRDD[ED2]
}

Graph Algorithms

Pre-implemented graph algorithms available through GraphOps.

/**
 * Additional operations available on Graph through implicit conversion
 */
implicit class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
  /** Run PageRank algorithm */
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
  
  /** Run static PageRank for fixed number of iterations */
  def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
  
  /** Find connected components */
  def connectedComponents(): Graph[VertexId, ED]
  
  /** Count triangles passing through each vertex */
  def triangleCount(): Graph[Int, ED]
  
  /** Find strongly connected components */
  def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
  
  /** Collect neighbor IDs for each vertex */
  def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
  
  /** Collect neighbor attributes for each vertex */
  def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
}

/**
 * Edge directions for neighbor collection
 */
object EdgeDirection extends Enumeration {
  val In, Out, Either, Both = Value
}

Usage Examples:

import org.apache.spark.graphx.lib._

// PageRank
val pageRankGraph = graph.pageRank(0.0001, 0.15)
val pageRanks = pageRankGraph.vertices.collect()

// Connected Components
val ccGraph = graph.connectedComponents()
val components = ccGraph.vertices.collect()

// Triangle Count
val triangleCountGraph = graph.triangleCount()
val triangleCounts = triangleCountGraph.vertices.collect()

// Collect neighbors
val neighbors = graph.collectNeighborIds(EdgeDirection.Out)

Pregel API

Pregel-style bulk-synchronous message-passing abstraction.

/**
 * Pregel-style computation
 */
object Pregel {
  /** Run Pregel computation */
  def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](
    graph: Graph[VD, ED],
    initialMsg: A,
    maxIterations: Int = Int.MaxValue,
    activeDirection: EdgeDirection = EdgeDirection.Either
  )(
    vprog: (VertexId, VD, A) => VD,
    sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
    mergeMsg: (A, A) => A
  ): Graph[VD, ED]
}

Usage Examples:

// Single Source Shortest Path using Pregel
def shortestPaths(graph: Graph[Double, Double], sourceId: VertexId): Graph[Double, Double] = {
  val initialGraph = graph.mapVertices((id, _) =>
    if (id == sourceId) 0.0 else Double.PositiveInfinity
  )
  
  Pregel(initialGraph, Double.PositiveInfinity)(
    // Vertex program: update vertex value with minimum distance
    (id, dist, newDist) => math.min(dist, newDist),
    // Send message: send distance + edge weight to neighbors
    triplet => {
      if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
        Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
      } else {
        Iterator.empty
      }
    },
    // Merge messages: take minimum
    (a, b) => math.min(a, b)
  )
}

Graph Partitioning

Strategies for partitioning graphs across cluster nodes.

/**
 * Partitioning strategies for graph distribution
 */
object PartitionStrategy extends Enumeration {
  /** Randomly assign edges to partitions */
  val RandomVertexCut = Value
  /** Assign edges based on source vertex hash */
  val EdgePartition1D = Value  
  /** Two-dimensional partitioning */
  val EdgePartition2D = Value
  /** Canonical random vertex cut */
  val CanonicalRandomVertexCut = Value
}

/**
 * Partition graphs efficiently
 */
implicit class GraphPartitioning[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
  /** Repartition graph using specified strategy */
  def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
  def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]
}

Graph I/O

Loading and saving graphs from various formats.

/**
 * Save graph to various formats
 */
implicit class GraphWriter[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
  /** Save vertices to text file */
  def saveVerticesAsTextFile(path: String): Unit
  /** Save edges to text file */  
  def saveEdgesAsTextFile(path: String): Unit
}

Usage Examples:

// Partition graph
val partitionedGraph = graph
  .partitionBy(PartitionStrategy.EdgePartition2D, numPartitions = 4)

// Save graph
graph.vertices.saveAsTextFile("hdfs://path/to/vertices")
graph.edges.saveAsTextFile("hdfs://path/to/edges")

// Complex graph analysis pipeline
val socialGraph = Graph.fromEdgeTuples(friendships, "User")

val pageRanks = socialGraph
  .pageRank(0.0001)
  .vertices
  .sortBy(_._2, ascending = false)

val communities = socialGraph
  .connectedComponents()
  .vertices
  .map { case (userId, componentId) => (componentId, userId) }
  .groupByKey()
  .collect()

Error Handling

Common GraphX exceptions:

  • IllegalArgumentException - Invalid graph construction parameters
  • SparkException - General Spark execution errors during graph operations
  • ClassCastException - Type mismatches in vertex/edge attributes

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-parent-2-12

docs

core.md

deployment.md

graphx.md

index.md

ml.md

sql.md

streaming.md

tile.json