Bagel is a Spark implementation of Google's Pregel graph processing framework, deprecated and superseded by GraphX
npx @tessl/cli install tessl/maven-org-apache-spark--spark-bagel_2-10@1.6.0⚠️ DEPRECATED: Bagel is deprecated as of Spark 1.6.0 and superseded by GraphX. This documentation is provided for legacy compatibility only.
Bagel is a Spark implementation of Google's Pregel graph processing framework. It provides a distributed vertex-centric programming model for large-scale graph computation using iterative message passing between vertices in supersteps.
org.apache.spark:spark-bagel_2.10:1.6.3import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._For specific components:
import org.apache.spark.bagel.{Bagel, Vertex, Message, Combiner, Aggregator}For complete usage with Spark components:
import org.apache.spark.{SparkContext, HashPartitioner, Partitioner}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import scala.reflect.Manifestimport org.apache.spark.SparkContext
import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._
// Define custom vertex class
class PageRankVertex(
val id: String,
val rank: Double,
val outEdges: Seq[String],
val active: Boolean
) extends Vertex with Serializable
// Define custom message class
class PageRankMessage(
val targetId: String,
val rankShare: Double
) extends Message[String] with Serializable
// Load graph data
val vertices = sc.parallelize(Array(
("A", new PageRankVertex("A", 1.0, Seq("B", "C"), true)),
("B", new PageRankVertex("B", 1.0, Seq("C"), true)),
("C", new PageRankVertex("C", 1.0, Seq("A"), true))
))
val messages = sc.parallelize(Array[(String, PageRankMessage)]())
// Define compute function
def compute(vertex: PageRankVertex, msgs: Option[Array[PageRankMessage]], superstep: Int) = {
val msgSum = msgs.getOrElse(Array()).map(_.rankShare).sum
val newRank = if (msgSum != 0) 0.15 + 0.85 * msgSum else vertex.rank
val halt = superstep >= 10
val outMsgs = if (!halt) {
vertex.outEdges.map(targetId =>
new PageRankMessage(targetId, newRank / vertex.outEdges.size)
).toArray
} else Array()
(new PageRankVertex(vertex.id, newRank, vertex.outEdges, !halt), outMsgs)
}
// Run Bagel program
val result = Bagel.run(sc, vertices, messages, 3)(compute)Bagel implements the Pregel computational model with these key components:
Core Bagel computation engine that executes Pregel-style vertex programs with iterative message passing.
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
object Bagel {
/**
* Runs a Bagel program with full feature support
* @param sc SparkContext for distributed execution
* @param vertices Initial vertex state as RDD of (K, V) pairs
* @param messages Initial messages as RDD of (K, M) pairs
* @param combiner Message combiner for reducing network traffic
* @param aggregator Optional global aggregator across vertices
* @param partitioner Partitioning strategy for distributed data
* @param numPartitions Number of partitions for graph data
* @param storageLevel Storage level for intermediate RDDs
* @param compute User-defined vertex computation function
* @return Final vertex states after convergence
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest, A: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
aggregator: Option[Aggregator[V, A]],
partitioner: Partitioner,
numPartitions: Int,
storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL
)(
compute: (V, Option[C], Option[A], Int) => (V, Array[M])
): RDD[(K, V)]
/**
* Simplified run without aggregator and default storage
* @param compute User function taking (vertex, combinedMessages, superstep)
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
partitioner: Partitioner,
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]
/**
* Run with custom storage level, no aggregator
* @param storageLevel RDD caching strategy for intermediate results
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
partitioner: Partitioner,
numPartitions: Int,
storageLevel: StorageLevel
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]
/**
* Run with default HashPartitioner, no aggregator
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]
/**
* Run with default HashPartitioner and custom storage
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
numPartitions: Int,
storageLevel: StorageLevel
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]
/**
* Run with DefaultCombiner, HashPartitioner, and default storage
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
numPartitions: Int
)(
compute: (V, Option[Array[M]], Int) => (V, Array[M])
): RDD[(K, V)]
/**
* Run with DefaultCombiner, HashPartitioner, and custom storage
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
numPartitions: Int,
storageLevel: StorageLevel
)(
compute: (V, Option[Array[M]], Int) => (V, Array[M])
): RDD[(K, V)]
}Interface for combining multiple messages to the same vertex to reduce network communication overhead.
/**
* Trait for combining messages to reduce network traffic
* @tparam M Original message type
* @tparam C Combined message type
*/
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Combiner[M, C] {
/** Create initial combiner from single message */
def createCombiner(msg: M): C
/** Merge message into existing combiner */
def mergeMsg(combiner: C, msg: M): C
/** Merge two combiners together */
def mergeCombiners(a: C, b: C): C
}
/**
* Default combiner that appends messages without actual combining
* @tparam M Message type
*/
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
def createCombiner(msg: M): Array[M]
def mergeMsg(combiner: Array[M], msg: M): Array[M]
def mergeCombiners(a: Array[M], b: Array[M]): Array[M]
}Interface for performing reduce operations across all vertices after each superstep.
/**
* Trait for aggregating values across all vertices per superstep
* @tparam V Vertex type
* @tparam A Aggregated value type
*/
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Aggregator[V, A] {
/** Create aggregator from single vertex */
def createAggregator(vert: V): A
/** Merge two aggregators together */
def mergeAggregators(a: A, b: A): A
}Core abstractions for representing vertices and messages in the graph computation model.
/**
* Base trait for graph vertices
* All user vertex classes must extend this trait
*/
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Vertex {
/** Whether vertex should continue computation in next superstep */
def active: Boolean
}
/**
* Base trait for messages sent between vertices
* @tparam K Key type for vertex identification
*/
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Message[K] {
/** ID of destination vertex for this message */
def targetId: K
}/** Default storage level for intermediate RDDs (MEMORY_AND_DISK) */
val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISKBagel operations can throw standard Spark exceptions:
All user-defined vertex, message, combiner, and aggregator classes must extend Serializable (or java.io.Serializable) for distributed execution.
Users should migrate to GraphX: Bagel is deprecated and GraphX provides superior performance, more graph algorithms, and better integration with Spark's ecosystem. See the GraphX Programming Guide for migration guidance.