CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-spark--spark-bagel-2-10

Bagel is a Spark implementation of Google's Pregel graph processing framework, deprecated and superseded by GraphX

Overview
Eval results
Files

index.mddocs/

Apache Spark Bagel

⚠️ DEPRECATED: Bagel is deprecated as of Spark 1.6.0 and superseded by GraphX. This documentation is provided for legacy compatibility only.

Bagel is a Spark implementation of Google's Pregel graph processing framework. It provides a distributed vertex-centric programming model for large-scale graph computation using iterative message passing between vertices in supersteps.

Package Information

  • Package Name: spark-bagel_2.10
  • Package Type: maven
  • Language: Scala
  • Group ID: org.apache.spark
  • Artifact ID: spark-bagel_2.10
  • Version: 1.6.3
  • Installation: Add Maven/SBT dependency for org.apache.spark:spark-bagel_2.10:1.6.3

Core Imports

import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._

For specific components:

import org.apache.spark.bagel.{Bagel, Vertex, Message, Combiner, Aggregator}

For complete usage with Spark components:

import org.apache.spark.{SparkContext, HashPartitioner, Partitioner}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import scala.reflect.Manifest

Basic Usage

import org.apache.spark.SparkContext
import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._

// Define custom vertex class
class PageRankVertex(
  val id: String, 
  val rank: Double, 
  val outEdges: Seq[String],
  val active: Boolean
) extends Vertex with Serializable

// Define custom message class  
class PageRankMessage(
  val targetId: String, 
  val rankShare: Double
) extends Message[String] with Serializable

// Load graph data
val vertices = sc.parallelize(Array(
  ("A", new PageRankVertex("A", 1.0, Seq("B", "C"), true)),
  ("B", new PageRankVertex("B", 1.0, Seq("C"), true)),
  ("C", new PageRankVertex("C", 1.0, Seq("A"), true))
))

val messages = sc.parallelize(Array[(String, PageRankMessage)]())

// Define compute function
def compute(vertex: PageRankVertex, msgs: Option[Array[PageRankMessage]], superstep: Int) = {
  val msgSum = msgs.getOrElse(Array()).map(_.rankShare).sum
  val newRank = if (msgSum != 0) 0.15 + 0.85 * msgSum else vertex.rank
  val halt = superstep >= 10
  
  val outMsgs = if (!halt) {
    vertex.outEdges.map(targetId => 
      new PageRankMessage(targetId, newRank / vertex.outEdges.size)
    ).toArray
  } else Array()
  
  (new PageRankVertex(vertex.id, newRank, vertex.outEdges, !halt), outMsgs)
}

// Run Bagel program
val result = Bagel.run(sc, vertices, messages, 3)(compute)

Architecture

Bagel implements the Pregel computational model with these key components:

  • Vertex-Centric Model: Computation focuses on individual vertices rather than edges
  • Superstep Iterations: Program executes in synchronized iterations called supersteps
  • Message Passing: Vertices communicate only through messages sent to other vertices
  • Combiners: Optional message aggregation to reduce network traffic
  • Aggregators: Global reduce operations across all vertices per superstep
  • Distributed Storage: Uses Spark RDDs for distributed vertex and message storage

Capabilities

Graph Computation

Core Bagel computation engine that executes Pregel-style vertex programs with iterative message passing.

@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
object Bagel {
/**
 * Runs a Bagel program with full feature support
 * @param sc SparkContext for distributed execution
 * @param vertices Initial vertex state as RDD of (K, V) pairs
 * @param messages Initial messages as RDD of (K, M) pairs  
 * @param combiner Message combiner for reducing network traffic
 * @param aggregator Optional global aggregator across vertices
 * @param partitioner Partitioning strategy for distributed data
 * @param numPartitions Number of partitions for graph data
 * @param storageLevel Storage level for intermediate RDDs
 * @param compute User-defined vertex computation function
 * @return Final vertex states after convergence
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest, A: Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)], 
  messages: RDD[(K, M)],
  combiner: Combiner[M, C],
  aggregator: Option[Aggregator[V, A]], 
  partitioner: Partitioner,
  numPartitions: Int,
  storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL
)(
  compute: (V, Option[C], Option[A], Int) => (V, Array[M])
): RDD[(K, V)]

/**
 * Simplified run without aggregator and default storage
 * @param compute User function taking (vertex, combinedMessages, superstep)
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)],
  messages: RDD[(K, M)], 
  combiner: Combiner[M, C],
  partitioner: Partitioner,
  numPartitions: Int
)(
  compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]

/**
 * Run with custom storage level, no aggregator
 * @param storageLevel RDD caching strategy for intermediate results
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)],
  messages: RDD[(K, M)],
  combiner: Combiner[M, C], 
  partitioner: Partitioner,
  numPartitions: Int,
  storageLevel: StorageLevel
)(
  compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]

/**
 * Run with default HashPartitioner, no aggregator
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)],
  messages: RDD[(K, M)],
  combiner: Combiner[M, C],
  numPartitions: Int
)(
  compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]

/**
 * Run with default HashPartitioner and custom storage
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)],
  messages: RDD[(K, M)],
  combiner: Combiner[M, C],
  numPartitions: Int,
  storageLevel: StorageLevel
)(
  compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]

/**
 * Run with DefaultCombiner, HashPartitioner, and default storage
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)],
  messages: RDD[(K, M)],
  numPartitions: Int
)(
  compute: (V, Option[Array[M]], Int) => (V, Array[M])
): RDD[(K, V)]

/**
 * Run with DefaultCombiner, HashPartitioner, and custom storage
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)],
  messages: RDD[(K, M)],
  numPartitions: Int,
  storageLevel: StorageLevel
)(
  compute: (V, Option[Array[M]], Int) => (V, Array[M])
): RDD[(K, V)]

}

Message Combining

Interface for combining multiple messages to the same vertex to reduce network communication overhead.

/**
 * Trait for combining messages to reduce network traffic
 * @tparam M Original message type
 * @tparam C Combined message type
 */
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Combiner[M, C] {
  /** Create initial combiner from single message */
  def createCombiner(msg: M): C
  
  /** Merge message into existing combiner */
  def mergeMsg(combiner: C, msg: M): C
  
  /** Merge two combiners together */
  def mergeCombiners(a: C, b: C): C
}

/**
 * Default combiner that appends messages without actual combining
 * @tparam M Message type
 */
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
  def createCombiner(msg: M): Array[M]
  def mergeMsg(combiner: Array[M], msg: M): Array[M]  
  def mergeCombiners(a: Array[M], b: Array[M]): Array[M]
}

Global Aggregation

Interface for performing reduce operations across all vertices after each superstep.

/**
 * Trait for aggregating values across all vertices per superstep
 * @tparam V Vertex type
 * @tparam A Aggregated value type
 */
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Aggregator[V, A] {
  /** Create aggregator from single vertex */
  def createAggregator(vert: V): A
  
  /** Merge two aggregators together */
  def mergeAggregators(a: A, b: A): A
}

Graph Data Types

Core abstractions for representing vertices and messages in the graph computation model.

/**
 * Base trait for graph vertices
 * All user vertex classes must extend this trait
 */
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Vertex {
  /** Whether vertex should continue computation in next superstep */
  def active: Boolean
}

/**
 * Base trait for messages sent between vertices
 * @tparam K Key type for vertex identification
 */
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Message[K] {
  /** ID of destination vertex for this message */
  def targetId: K
}

Constants

/** Default storage level for intermediate RDDs (MEMORY_AND_DISK) */
val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK

Type Parameters

  • K: Key type for vertex identification (must have Manifest)
  • V: Vertex type (must extend Vertex and have Manifest)
  • M: Message type (must extend Message[K] and have Manifest)
  • C: Combined message type (must have Manifest)
  • A: Aggregated value type (must have Manifest)

Error Handling

Bagel operations can throw standard Spark exceptions:

  • SparkException: For general Spark execution failures
  • ClassCastException: For type mismatches in user-defined functions
  • SerializationException: When user classes are not properly serializable

All user-defined vertex, message, combiner, and aggregator classes must extend Serializable (or java.io.Serializable) for distributed execution.

Migration Note

Users should migrate to GraphX: Bagel is deprecated and GraphX provides superior performance, more graph algorithms, and better integration with Spark's ecosystem. See the GraphX Programming Guide for migration guidance.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-spark--spark-bagel-2-10

docs

index.md

tile.json