Fundamental graph construction, transformation, and analysis operations for building and manipulating distributed graph structures in GraphX.
Create graphs from vertices and edges RDDs with full type safety and optimized partitioning.
/**
* Construct a graph from vertex and edge RDDs
* @param vertices RDD of (VertexId, VertexAttribute) pairs
* @param edges RDD of Edge objects with attributes
* @param defaultVertexAttr Default attribute for vertices not in vertices RDD
* @param edgeStorageLevel Storage level for edges
* @param vertexStorageLevel Storage level for vertices
* @returns New graph instance
*/
def Graph.apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: Option[VD] = None,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): Graph[VD, ED]
/**
* Construct a graph from just edges, creating vertices with default attributes
* @param edges RDD of edges
* @param defaultValue Default vertex attribute
* @param uniqueEdges Whether to combine duplicate edges
* @param edgeStorageLevel Storage level for edges
* @param vertexStorageLevel Storage level for vertices
* @returns New graph with inferred vertices
*/
def Graph.fromEdges[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): Graph[VD, ED]
/**
* Construct a graph from edge tuples with integer edge attributes
* @param rawEdges RDD of (srcId, dstId) tuples
* @param defaultValue Default vertex attribute
* @param uniqueEdges Whether to combine duplicate edges
* @param edgeStorageLevel Storage level for edges
* @param vertexStorageLevel Storage level for vertices
* @returns New graph with integer edge weights
*/
def Graph.fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): Graph[VD, Int]Usage Examples:
import org.apache.spark.graphx._
// Create from vertices and edges
val vertices = sc.parallelize(Array((1L, "Alice"), (2L, "Bob")))
val edges = sc.parallelize(Array(Edge(1L, 2L, "friend")))
val graph = Graph(vertices, edges)
// Create from edges only
val edges = sc.parallelize(Array(Edge(1L, 2L, 1.0), Edge(2L, 3L, 2.0)))
val graph = Graph.fromEdges(edges, defaultValue = "Unknown")
// Create from edge tuples
val edgeTuples = sc.parallelize(Array((1L, 2L), (2L, 3L), (3L, 1L)))
val graph = Graph.fromEdgeTuples(edgeTuples, defaultValue = 0)Access basic graph metrics and structure information.
abstract class Graph[VD: ClassTag, ED: ClassTag] {
/** RDD containing vertices and their attributes */
val vertices: VertexRDD[VD]
/** RDD containing edges and their attributes */
val edges: EdgeRDD[ED]
/** RDD of edge triplets with adjacent vertex attributes */
val triplets: RDD[EdgeTriplet[VD, ED]]
}
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
/** Total number of vertices in the graph */
def numVertices: Long
/** Total number of edges in the graph */
def numEdges: Long
/** In-degree of each vertex */
def inDegrees: VertexRDD[Int]
/** Out-degree of each vertex */
def outDegrees: VertexRDD[Int]
/** Total degree (in + out) of each vertex */
def degrees: VertexRDD[Int]
}Transform vertex and edge attributes while preserving graph structure.
/**
* Transform vertex attributes using a mapping function
* @param map Function transforming (VertexId, VertexAttribute) to new attribute
* @returns New graph with transformed vertex attributes
*/
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]
/**
* Transform edge attributes using edge objects
* @param map Function transforming Edge to new edge attribute
* @returns New graph with transformed edge attributes
*/
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]
/**
* Transform edge attributes using triplets (includes adjacent vertex data)
* @param map Function transforming EdgeTriplet to new edge attribute
* @returns New graph with transformed edge attributes
*/
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
/**
* Transform edge attributes using triplets with optimization hints
* @param map Function transforming EdgeTriplet to new edge attribute
* @param tripletFields Fields accessed by map function for optimization
* @returns New graph with transformed edge attributes
*/
def mapTriplets[ED2: ClassTag](
map: EdgeTriplet[VD, ED] => ED2,
tripletFields: TripletFields
): Graph[VD, ED2]
/**
* Reverse the direction of all edges
* @returns New graph with reversed edges
*/
def reverse: Graph[VD, ED]Usage Examples:
// Transform vertex attributes
val upperGraph = graph.mapVertices((id, name) => name.toUpperCase)
// Transform edge attributes
val weightedGraph = graph.mapEdges(edge => edge.attr.length)
// Transform edges using adjacent vertex data
val labeledGraph = graph.mapTriplets(triplet =>
s"${triplet.srcAttr}->${triplet.dstAttr}")
// Reverse all edges
val reversedGraph = graph.reverseFilter graphs by vertex and edge predicates to create subgraphs.
/**
* Filter graph by edge and vertex predicates
* @param epred Edge predicate function (EdgeTriplet => Boolean)
* @param vpred Vertex predicate function ((VertexId, VD) => Boolean)
* @returns Subgraph containing only vertices/edges satisfying predicates
*/
def subgraph(
epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true)
): Graph[VD, ED]
/**
* Restrict graph to vertices and edges also present in another graph
* @param other Graph defining the mask
* @returns Intersection of current graph with other graph
*/
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]
/**
* Filter graph with preprocessing step for optimization
* @param preprocess Function to preprocess graph before filtering
* @param epred Edge predicate
* @param vpred Vertex predicate
* @returns Filtered graph
*/
def filter[VD2: ClassTag, ED2: ClassTag](
preprocess: Graph[VD, ED] => Graph[VD2, ED2],
epred: EdgeTriplet[VD2, ED2] => Boolean,
vpred: (VertexId, VD2) => Boolean
): Graph[VD, ED]Join graphs with RDDs and perform message-passing aggregation operations.
/**
* Join vertices with an RDD, transforming vertex attributes
* @param table RDD of (VertexId, U) pairs to join
* @param mapFunc Function to combine vertex attribute and table value
* @returns New graph with joined vertex attributes
*/
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(
mapFunc: (VertexId, VD, U) => VD
): Graph[VD, ED]
/**
* Left outer join vertices with an RDD
* @param other RDD to join with
* @param mapFunc Function handling (VertexId, VD, Option[U])
* @returns New graph with joined attributes
*/
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(
mapFunc: (VertexId, VD, Option[U]) => VD2
): Graph[VD2, ED]
/**
* Core message-passing aggregation API
* @param sendMsg Function defining messages sent along edges
* @param mergeMsg Function combining messages at vertices
* @param tripletFields Fields accessed for optimization
* @returns VertexRDD with aggregated messages
*/
def aggregateMessages[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields = TripletFields.All
): VertexRDD[A]Usage Examples:
// Join with user ages
val ages = sc.parallelize(Array((1L, 25), (2L, 30)))
val graphWithAges = graph.joinVertices(ages)((id, name, age) => (name, age))
// Subgraph filtering
val activeUsers = graph.subgraph(
vpred = (id, user) => user.active,
epred = triplet => triplet.attr == "friend"
)
// Message aggregation - compute in-degrees
val inDegrees = graph.aggregateMessages[Int](
sendMsg = ctx => ctx.sendToDst(1),
mergeMsg = (a, b) => a + b
)Control caching, persistence, and fault tolerance for iterative graph algorithms.
/**
* Persist graph at specified storage level
* @param newLevel Storage level for vertices and edges
* @returns Graph with specified persistence level
*/
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
/**
* Cache graph at default storage level (MEMORY_ONLY)
* @returns Cached graph
*/
def cache(): Graph[VD, ED]
/**
* Mark graph for checkpointing to enable fault tolerance
*/
def checkpoint(): Unit
/**
* Remove graph from cache/persistence
* @param blocking Whether to block until unpersist is complete
* @returns Unpersisted graph
*/
def unpersist(blocking: Boolean = true): Graph[VD, ED]
/**
* Repartition edges using specified partitioning strategy
* @param partitionStrategy Strategy for distributing edges
* @returns Repartitioned graph
*/
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]/** 64-bit vertex identifier */
type VertexId = Long
/** Integer partition identifier (must be < 2^30) */
type PartitionID = Int
/**
* Directed edge with source, destination, and attribute
*/
case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED) {
/** Get the other vertex ID in this edge */
def otherVertexId(vid: VertexId): VertexId
/** Get edge direction relative to a vertex */
def relativeDirection(vid: VertexId): EdgeDirection
}
/**
* Edge with adjacent vertex attributes for message passing
*/
class EdgeTriplet[VD, ED] extends Edge[ED] {
/** Source vertex attribute */
val srcAttr: VD
/** Destination vertex attribute */
val dstAttr: VD
/** Get other vertex attribute */
def otherVertexAttr(vid: VertexId): VD
/** Get vertex attribute for specified vertex */
def vertexAttr(vid: VertexId): VD
/** Convert to tuple representation */
def toTuple: ((VertexId, VD), (VertexId, VD), ED)
}
/**
* Context for sending messages in aggregateMessages
*/
abstract class EdgeContext[VD, ED, A] {
val srcId: VertexId
val dstId: VertexId
val srcAttr: VD
val dstAttr: VD
val attr: ED
/** Send message to source vertex */
def sendToSrc(msg: A): Unit
/** Send message to destination vertex */
def sendToDst(msg: A): Unit
/** Convert to EdgeTriplet */
def toEdgeTriplet: EdgeTriplet[VD, ED]
}/**
* Specialized RDD for vertices with efficient joins and indexing
*/
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)] {
/** Reindex to contain only visible vertices */
def reindex(): VertexRDD[VD]
/** Transform vertex attributes */
def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
/** Filter vertices by predicate */
def filter(pred: (VertexId, VD) => Boolean): VertexRDD[VD]
/** Set difference with another RDD */
def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]
/** Diff operation returning vertices that differ */
def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]
/** Left join with another RDD */
def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(
f: (VertexId, VD, Option[VD2]) => VD3
): VertexRDD[VD3]
/** Inner join with another RDD */
def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(
f: (VertexId, VD, U) => VD2
): VertexRDD[VD2]
}
/**
* Specialized RDD for edges with columnar storage
*/
abstract class EdgeRDD[ED] extends RDD[Edge[ED]] {
/** Transform edge attributes preserving structure */
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
/** Reverse all edges */
def reverse: EdgeRDD[ED]
/** Inner join with another EdgeRDD */
def innerJoin[ED2: ClassTag, ED3: ClassTag](other: EdgeRDD[ED2])(
f: (VertexId, VertexId, ED, ED2) => ED3
): EdgeRDD[ED3]
}