or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-graph-api.mdgraph-algorithms.mdindex.mdpregel-api.mdutilities.md
tile.json

core-graph-api.mddocs/

Core Graph API

Fundamental graph construction, transformation, and analysis operations for building and manipulating distributed graph structures in GraphX.

Capabilities

Graph Construction

Create graphs from vertices and edges RDDs with full type safety and optimized partitioning.

/**
 * Construct a graph from vertex and edge RDDs
 * @param vertices RDD of (VertexId, VertexAttribute) pairs
 * @param edges RDD of Edge objects with attributes  
 * @param defaultVertexAttr Default attribute for vertices not in vertices RDD
 * @param edgeStorageLevel Storage level for edges
 * @param vertexStorageLevel Storage level for vertices
 * @returns New graph instance
 */
def Graph.apply[VD: ClassTag, ED: ClassTag](
  vertices: RDD[(VertexId, VD)],
  edges: RDD[Edge[ED]],
  defaultVertexAttr: Option[VD] = None,
  edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
  vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): Graph[VD, ED]

/**
 * Construct a graph from just edges, creating vertices with default attributes
 * @param edges RDD of edges  
 * @param defaultValue Default vertex attribute
 * @param uniqueEdges Whether to combine duplicate edges
 * @param edgeStorageLevel Storage level for edges
 * @param vertexStorageLevel Storage level for vertices
 * @returns New graph with inferred vertices
 */
def Graph.fromEdges[VD: ClassTag, ED: ClassTag](
  edges: RDD[Edge[ED]],
  defaultValue: VD,
  uniqueEdges: Option[PartitionStrategy] = None,
  edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
  vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY  
): Graph[VD, ED]

/**
 * Construct a graph from edge tuples with integer edge attributes
 * @param rawEdges RDD of (srcId, dstId) tuples
 * @param defaultValue Default vertex attribute
 * @param uniqueEdges Whether to combine duplicate edges
 * @param edgeStorageLevel Storage level for edges
 * @param vertexStorageLevel Storage level for vertices
 * @returns New graph with integer edge weights
 */
def Graph.fromEdgeTuples[VD: ClassTag](
  rawEdges: RDD[(VertexId, VertexId)],
  defaultValue: VD,
  uniqueEdges: Option[PartitionStrategy] = None,
  edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
  vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
): Graph[VD, Int]

Usage Examples:

import org.apache.spark.graphx._

// Create from vertices and edges
val vertices = sc.parallelize(Array((1L, "Alice"), (2L, "Bob")))
val edges = sc.parallelize(Array(Edge(1L, 2L, "friend")))
val graph = Graph(vertices, edges)

// Create from edges only
val edges = sc.parallelize(Array(Edge(1L, 2L, 1.0), Edge(2L, 3L, 2.0)))  
val graph = Graph.fromEdges(edges, defaultValue = "Unknown")

// Create from edge tuples
val edgeTuples = sc.parallelize(Array((1L, 2L), (2L, 3L), (3L, 1L)))
val graph = Graph.fromEdgeTuples(edgeTuples, defaultValue = 0)

Graph Properties

Access basic graph metrics and structure information.

abstract class Graph[VD: ClassTag, ED: ClassTag] {
  /** RDD containing vertices and their attributes */
  val vertices: VertexRDD[VD]
  
  /** RDD containing edges and their attributes */
  val edges: EdgeRDD[ED]
  
  /** RDD of edge triplets with adjacent vertex attributes */
  val triplets: RDD[EdgeTriplet[VD, ED]]
}

class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
  /** Total number of vertices in the graph */
  def numVertices: Long
  
  /** Total number of edges in the graph */  
  def numEdges: Long
  
  /** In-degree of each vertex */
  def inDegrees: VertexRDD[Int]
  
  /** Out-degree of each vertex */
  def outDegrees: VertexRDD[Int]
  
  /** Total degree (in + out) of each vertex */
  def degrees: VertexRDD[Int]
}

Graph Transformations

Transform vertex and edge attributes while preserving graph structure.

/**
 * Transform vertex attributes using a mapping function
 * @param map Function transforming (VertexId, VertexAttribute) to new attribute
 * @returns New graph with transformed vertex attributes
 */
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]

/**
 * Transform edge attributes using edge objects
 * @param map Function transforming Edge to new edge attribute
 * @returns New graph with transformed edge attributes
 */  
def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]

/**
 * Transform edge attributes using triplets (includes adjacent vertex data)
 * @param map Function transforming EdgeTriplet to new edge attribute
 * @returns New graph with transformed edge attributes
 */
def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

/**
 * Transform edge attributes using triplets with optimization hints
 * @param map Function transforming EdgeTriplet to new edge attribute  
 * @param tripletFields Fields accessed by map function for optimization
 * @returns New graph with transformed edge attributes
 */
def mapTriplets[ED2: ClassTag](
  map: EdgeTriplet[VD, ED] => ED2,
  tripletFields: TripletFields
): Graph[VD, ED2]

/**
 * Reverse the direction of all edges
 * @returns New graph with reversed edges
 */
def reverse: Graph[VD, ED]

Usage Examples:

// Transform vertex attributes
val upperGraph = graph.mapVertices((id, name) => name.toUpperCase)

// Transform edge attributes  
val weightedGraph = graph.mapEdges(edge => edge.attr.length)

// Transform edges using adjacent vertex data
val labeledGraph = graph.mapTriplets(triplet => 
  s"${triplet.srcAttr}->${triplet.dstAttr}")

// Reverse all edges
val reversedGraph = graph.reverse

Graph Filtering and Subgraphs

Filter graphs by vertex and edge predicates to create subgraphs.

/**
 * Filter graph by edge and vertex predicates  
 * @param epred Edge predicate function (EdgeTriplet => Boolean)
 * @param vpred Vertex predicate function ((VertexId, VD) => Boolean)
 * @returns Subgraph containing only vertices/edges satisfying predicates
 */
def subgraph(
  epred: EdgeTriplet[VD, ED] => Boolean = (x => true),
  vpred: (VertexId, VD) => Boolean = ((v, d) => true)
): Graph[VD, ED]

/**
 * Restrict graph to vertices and edges also present in another graph
 * @param other Graph defining the mask
 * @returns Intersection of current graph with other graph
 */
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]

/**
 * Filter graph with preprocessing step for optimization
 * @param preprocess Function to preprocess graph before filtering
 * @param epred Edge predicate 
 * @param vpred Vertex predicate
 * @returns Filtered graph
 */
def filter[VD2: ClassTag, ED2: ClassTag](
  preprocess: Graph[VD, ED] => Graph[VD2, ED2],
  epred: EdgeTriplet[VD2, ED2] => Boolean,
  vpred: (VertexId, VD2) => Boolean
): Graph[VD, ED]

Graph Joins and Aggregation

Join graphs with RDDs and perform message-passing aggregation operations.

/**
 * Join vertices with an RDD, transforming vertex attributes
 * @param table RDD of (VertexId, U) pairs to join
 * @param mapFunc Function to combine vertex attribute and table value  
 * @returns New graph with joined vertex attributes
 */
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(
  mapFunc: (VertexId, VD, U) => VD
): Graph[VD, ED]

/**
 * Left outer join vertices with an RDD
 * @param other RDD to join with
 * @param mapFunc Function handling (VertexId, VD, Option[U])
 * @returns New graph with joined attributes
 */
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(
  mapFunc: (VertexId, VD, Option[U]) => VD2
): Graph[VD2, ED]

/**
 * Core message-passing aggregation API
 * @param sendMsg Function defining messages sent along edges
 * @param mergeMsg Function combining messages at vertices
 * @param tripletFields Fields accessed for optimization
 * @returns VertexRDD with aggregated messages  
 */
def aggregateMessages[A: ClassTag](
  sendMsg: EdgeContext[VD, ED, A] => Unit,
  mergeMsg: (A, A) => A,
  tripletFields: TripletFields = TripletFields.All
): VertexRDD[A]

Usage Examples:

// Join with user ages
val ages = sc.parallelize(Array((1L, 25), (2L, 30)))
val graphWithAges = graph.joinVertices(ages)((id, name, age) => (name, age))

// Subgraph filtering
val activeUsers = graph.subgraph(
  vpred = (id, user) => user.active,
  epred = triplet => triplet.attr == "friend"  
)

// Message aggregation - compute in-degrees
val inDegrees = graph.aggregateMessages[Int](
  sendMsg = ctx => ctx.sendToDst(1),
  mergeMsg = (a, b) => a + b
)

Graph Persistence and Checkpointing

Control caching, persistence, and fault tolerance for iterative graph algorithms.

/**
 * Persist graph at specified storage level
 * @param newLevel Storage level for vertices and edges
 * @returns Graph with specified persistence level
 */
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

/**
 * Cache graph at default storage level (MEMORY_ONLY)
 * @returns Cached graph
 */
def cache(): Graph[VD, ED]

/**
 * Mark graph for checkpointing to enable fault tolerance
 */
def checkpoint(): Unit

/**
 * Remove graph from cache/persistence
 * @param blocking Whether to block until unpersist is complete
 * @returns Unpersisted graph  
 */
def unpersist(blocking: Boolean = true): Graph[VD, ED]

/**
 * Repartition edges using specified partitioning strategy
 * @param partitionStrategy Strategy for distributing edges
 * @returns Repartitioned graph
 */
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

Core Data Types

/** 64-bit vertex identifier */
type VertexId = Long

/** Integer partition identifier (must be < 2^30) */
type PartitionID = Int

/** 
 * Directed edge with source, destination, and attribute
 */
case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED) {
  /** Get the other vertex ID in this edge */  
  def otherVertexId(vid: VertexId): VertexId
  
  /** Get edge direction relative to a vertex */
  def relativeDirection(vid: VertexId): EdgeDirection
}

/**
 * Edge with adjacent vertex attributes for message passing
 */
class EdgeTriplet[VD, ED] extends Edge[ED] {
  /** Source vertex attribute */
  val srcAttr: VD
  
  /** Destination vertex attribute */ 
  val dstAttr: VD
  
  /** Get other vertex attribute */
  def otherVertexAttr(vid: VertexId): VD
  
  /** Get vertex attribute for specified vertex */
  def vertexAttr(vid: VertexId): VD
  
  /** Convert to tuple representation */
  def toTuple: ((VertexId, VD), (VertexId, VD), ED)
}

/**
 * Context for sending messages in aggregateMessages
 */
abstract class EdgeContext[VD, ED, A] {
  val srcId: VertexId
  val dstId: VertexId  
  val srcAttr: VD
  val dstAttr: VD
  val attr: ED
  
  /** Send message to source vertex */
  def sendToSrc(msg: A): Unit
  
  /** Send message to destination vertex */
  def sendToDst(msg: A): Unit
  
  /** Convert to EdgeTriplet */
  def toEdgeTriplet: EdgeTriplet[VD, ED]
}

Specialized RDDs

/**
 * Specialized RDD for vertices with efficient joins and indexing
 */
abstract class VertexRDD[VD] extends RDD[(VertexId, VD)] {
  /** Reindex to contain only visible vertices */
  def reindex(): VertexRDD[VD]
  
  /** Transform vertex attributes */
  def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]
  def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
  
  /** Filter vertices by predicate */
  def filter(pred: (VertexId, VD) => Boolean): VertexRDD[VD]
  
  /** Set difference with another RDD */
  def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]
  
  /** Diff operation returning vertices that differ */
  def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]
  
  /** Left join with another RDD */
  def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(
    f: (VertexId, VD, Option[VD2]) => VD3
  ): VertexRDD[VD3]
  
  /** Inner join with another RDD */
  def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(
    f: (VertexId, VD, U) => VD2
  ): VertexRDD[VD2]
}

/**
 * Specialized RDD for edges with columnar storage
 */  
abstract class EdgeRDD[ED] extends RDD[Edge[ED]] {
  /** Transform edge attributes preserving structure */
  def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]
  
  /** Reverse all edges */
  def reverse: EdgeRDD[ED]
  
  /** Inner join with another EdgeRDD */
  def innerJoin[ED2: ClassTag, ED3: ClassTag](other: EdgeRDD[ED2])(
    f: (VertexId, VertexId, ED, ED2) => ED3
  ): EdgeRDD[ED3]
}