or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-spark--spark-bagel_2-10

Bagel is a Spark implementation of Google's Pregel graph processing framework, deprecated and superseded by GraphX

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-bagel_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-bagel_2-10@1.6.0

index.mddocs/

Apache Spark Bagel

⚠️ DEPRECATED: Bagel is deprecated as of Spark 1.6.0 and superseded by GraphX. This documentation is provided for legacy compatibility only.

Bagel is a Spark implementation of Google's Pregel graph processing framework. It provides a distributed vertex-centric programming model for large-scale graph computation using iterative message passing between vertices in supersteps.

Package Information

  • Package Name: spark-bagel_2.10
  • Package Type: maven
  • Language: Scala
  • Group ID: org.apache.spark
  • Artifact ID: spark-bagel_2.10
  • Version: 1.6.3
  • Installation: Add Maven/SBT dependency for org.apache.spark:spark-bagel_2.10:1.6.3

Core Imports

import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._

For specific components:

import org.apache.spark.bagel.{Bagel, Vertex, Message, Combiner, Aggregator}

For complete usage with Spark components:

import org.apache.spark.{SparkContext, HashPartitioner, Partitioner}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import scala.reflect.Manifest

Basic Usage

import org.apache.spark.SparkContext
import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._

// Define custom vertex class
class PageRankVertex(
  val id: String, 
  val rank: Double, 
  val outEdges: Seq[String],
  val active: Boolean
) extends Vertex with Serializable

// Define custom message class  
class PageRankMessage(
  val targetId: String, 
  val rankShare: Double
) extends Message[String] with Serializable

// Load graph data
val vertices = sc.parallelize(Array(
  ("A", new PageRankVertex("A", 1.0, Seq("B", "C"), true)),
  ("B", new PageRankVertex("B", 1.0, Seq("C"), true)),
  ("C", new PageRankVertex("C", 1.0, Seq("A"), true))
))

val messages = sc.parallelize(Array[(String, PageRankMessage)]())

// Define compute function
def compute(vertex: PageRankVertex, msgs: Option[Array[PageRankMessage]], superstep: Int) = {
  val msgSum = msgs.getOrElse(Array()).map(_.rankShare).sum
  val newRank = if (msgSum != 0) 0.15 + 0.85 * msgSum else vertex.rank
  val halt = superstep >= 10
  
  val outMsgs = if (!halt) {
    vertex.outEdges.map(targetId => 
      new PageRankMessage(targetId, newRank / vertex.outEdges.size)
    ).toArray
  } else Array()
  
  (new PageRankVertex(vertex.id, newRank, vertex.outEdges, !halt), outMsgs)
}

// Run Bagel program
val result = Bagel.run(sc, vertices, messages, 3)(compute)

Architecture

Bagel implements the Pregel computational model with these key components:

  • Vertex-Centric Model: Computation focuses on individual vertices rather than edges
  • Superstep Iterations: Program executes in synchronized iterations called supersteps
  • Message Passing: Vertices communicate only through messages sent to other vertices
  • Combiners: Optional message aggregation to reduce network traffic
  • Aggregators: Global reduce operations across all vertices per superstep
  • Distributed Storage: Uses Spark RDDs for distributed vertex and message storage

Capabilities

Graph Computation

Core Bagel computation engine that executes Pregel-style vertex programs with iterative message passing.

@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
object Bagel {
/**
 * Runs a Bagel program with full feature support
 * @param sc SparkContext for distributed execution
 * @param vertices Initial vertex state as RDD of (K, V) pairs
 * @param messages Initial messages as RDD of (K, M) pairs  
 * @param combiner Message combiner for reducing network traffic
 * @param aggregator Optional global aggregator across vertices
 * @param partitioner Partitioning strategy for distributed data
 * @param numPartitions Number of partitions for graph data
 * @param storageLevel Storage level for intermediate RDDs
 * @param compute User-defined vertex computation function
 * @return Final vertex states after convergence
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest, A: Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)], 
  messages: RDD[(K, M)],
  combiner: Combiner[M, C],
  aggregator: Option[Aggregator[V, A]], 
  partitioner: Partitioner,
  numPartitions: Int,
  storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL
)(
  compute: (V, Option[C], Option[A], Int) => (V, Array[M])
): RDD[(K, V)]

/**
 * Simplified run without aggregator and default storage
 * @param compute User function taking (vertex, combinedMessages, superstep)
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)],
  messages: RDD[(K, M)], 
  combiner: Combiner[M, C],
  partitioner: Partitioner,
  numPartitions: Int
)(
  compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]

/**
 * Run with custom storage level, no aggregator
 * @param storageLevel RDD caching strategy for intermediate results
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)],
  messages: RDD[(K, M)],
  combiner: Combiner[M, C], 
  partitioner: Partitioner,
  numPartitions: Int,
  storageLevel: StorageLevel
)(
  compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]

/**
 * Run with default HashPartitioner, no aggregator
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)],
  messages: RDD[(K, M)],
  combiner: Combiner[M, C],
  numPartitions: Int
)(
  compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]

/**
 * Run with default HashPartitioner and custom storage
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)],
  messages: RDD[(K, M)],
  combiner: Combiner[M, C],
  numPartitions: Int,
  storageLevel: StorageLevel
)(
  compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]

/**
 * Run with DefaultCombiner, HashPartitioner, and default storage
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)],
  messages: RDD[(K, M)],
  numPartitions: Int
)(
  compute: (V, Option[Array[M]], Int) => (V, Array[M])
): RDD[(K, V)]

/**
 * Run with DefaultCombiner, HashPartitioner, and custom storage
 */
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
  sc: SparkContext,
  vertices: RDD[(K, V)],
  messages: RDD[(K, M)],
  numPartitions: Int,
  storageLevel: StorageLevel
)(
  compute: (V, Option[Array[M]], Int) => (V, Array[M])
): RDD[(K, V)]

}

Message Combining

Interface for combining multiple messages to the same vertex to reduce network communication overhead.

/**
 * Trait for combining messages to reduce network traffic
 * @tparam M Original message type
 * @tparam C Combined message type
 */
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Combiner[M, C] {
  /** Create initial combiner from single message */
  def createCombiner(msg: M): C
  
  /** Merge message into existing combiner */
  def mergeMsg(combiner: C, msg: M): C
  
  /** Merge two combiners together */
  def mergeCombiners(a: C, b: C): C
}

/**
 * Default combiner that appends messages without actual combining
 * @tparam M Message type
 */
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
  def createCombiner(msg: M): Array[M]
  def mergeMsg(combiner: Array[M], msg: M): Array[M]  
  def mergeCombiners(a: Array[M], b: Array[M]): Array[M]
}

Global Aggregation

Interface for performing reduce operations across all vertices after each superstep.

/**
 * Trait for aggregating values across all vertices per superstep
 * @tparam V Vertex type
 * @tparam A Aggregated value type
 */
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Aggregator[V, A] {
  /** Create aggregator from single vertex */
  def createAggregator(vert: V): A
  
  /** Merge two aggregators together */
  def mergeAggregators(a: A, b: A): A
}

Graph Data Types

Core abstractions for representing vertices and messages in the graph computation model.

/**
 * Base trait for graph vertices
 * All user vertex classes must extend this trait
 */
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Vertex {
  /** Whether vertex should continue computation in next superstep */
  def active: Boolean
}

/**
 * Base trait for messages sent between vertices
 * @tparam K Key type for vertex identification
 */
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Message[K] {
  /** ID of destination vertex for this message */
  def targetId: K
}

Constants

/** Default storage level for intermediate RDDs (MEMORY_AND_DISK) */
val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK

Type Parameters

  • K: Key type for vertex identification (must have Manifest)
  • V: Vertex type (must extend Vertex and have Manifest)
  • M: Message type (must extend Message[K] and have Manifest)
  • C: Combined message type (must have Manifest)
  • A: Aggregated value type (must have Manifest)

Error Handling

Bagel operations can throw standard Spark exceptions:

  • SparkException: For general Spark execution failures
  • ClassCastException: For type mismatches in user-defined functions
  • SerializationException: When user classes are not properly serializable

All user-defined vertex, message, combiner, and aggregator classes must extend Serializable (or java.io.Serializable) for distributed execution.

Migration Note

Users should migrate to GraphX: Bagel is deprecated and GraphX provides superior performance, more graph algorithms, and better integration with Spark's ecosystem. See the GraphX Programming Guide for migration guidance.