Bagel is a Spark implementation of Google's Pregel graph processing framework, deprecated and superseded by GraphX
⚠️ DEPRECATED: Bagel is deprecated as of Spark 1.6.0 and superseded by GraphX. This documentation is provided for legacy compatibility only.
Bagel is a Spark implementation of Google's Pregel graph processing framework. It provides a distributed vertex-centric programming model for large-scale graph computation using iterative message passing between vertices in supersteps.
org.apache.spark:spark-bagel_2.10:1.6.3import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._For specific components:
import org.apache.spark.bagel.{Bagel, Vertex, Message, Combiner, Aggregator}For complete usage with Spark components:
import org.apache.spark.{SparkContext, HashPartitioner, Partitioner}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import scala.reflect.Manifestimport org.apache.spark.SparkContext
import org.apache.spark.bagel._
import org.apache.spark.bagel.Bagel._
// Define custom vertex class
class PageRankVertex(
val id: String,
val rank: Double,
val outEdges: Seq[String],
val active: Boolean
) extends Vertex with Serializable
// Define custom message class
class PageRankMessage(
val targetId: String,
val rankShare: Double
) extends Message[String] with Serializable
// Load graph data
val vertices = sc.parallelize(Array(
("A", new PageRankVertex("A", 1.0, Seq("B", "C"), true)),
("B", new PageRankVertex("B", 1.0, Seq("C"), true)),
("C", new PageRankVertex("C", 1.0, Seq("A"), true))
))
val messages = sc.parallelize(Array[(String, PageRankMessage)]())
// Define compute function
def compute(vertex: PageRankVertex, msgs: Option[Array[PageRankMessage]], superstep: Int) = {
val msgSum = msgs.getOrElse(Array()).map(_.rankShare).sum
val newRank = if (msgSum != 0) 0.15 + 0.85 * msgSum else vertex.rank
val halt = superstep >= 10
val outMsgs = if (!halt) {
vertex.outEdges.map(targetId =>
new PageRankMessage(targetId, newRank / vertex.outEdges.size)
).toArray
} else Array()
(new PageRankVertex(vertex.id, newRank, vertex.outEdges, !halt), outMsgs)
}
// Run Bagel program
val result = Bagel.run(sc, vertices, messages, 3)(compute)Bagel implements the Pregel computational model with these key components:
Core Bagel computation engine that executes Pregel-style vertex programs with iterative message passing.
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
object Bagel {
/**
* Runs a Bagel program with full feature support
* @param sc SparkContext for distributed execution
* @param vertices Initial vertex state as RDD of (K, V) pairs
* @param messages Initial messages as RDD of (K, M) pairs
* @param combiner Message combiner for reducing network traffic
* @param aggregator Optional global aggregator across vertices
* @param partitioner Partitioning strategy for distributed data
* @param numPartitions Number of partitions for graph data
* @param storageLevel Storage level for intermediate RDDs
* @param compute User-defined vertex computation function
* @return Final vertex states after convergence
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest, A: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
aggregator: Option[Aggregator[V, A]],
partitioner: Partitioner,
numPartitions: Int,
storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL
)(
compute: (V, Option[C], Option[A], Int) => (V, Array[M])
): RDD[(K, V)]
/**
* Simplified run without aggregator and default storage
* @param compute User function taking (vertex, combinedMessages, superstep)
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
partitioner: Partitioner,
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]
/**
* Run with custom storage level, no aggregator
* @param storageLevel RDD caching strategy for intermediate results
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
partitioner: Partitioner,
numPartitions: Int,
storageLevel: StorageLevel
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]
/**
* Run with default HashPartitioner, no aggregator
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
numPartitions: Int
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]
/**
* Run with default HashPartitioner and custom storage
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
combiner: Combiner[M, C],
numPartitions: Int,
storageLevel: StorageLevel
)(
compute: (V, Option[C], Int) => (V, Array[M])
): RDD[(K, V)]
/**
* Run with DefaultCombiner, HashPartitioner, and default storage
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
numPartitions: Int
)(
compute: (V, Option[Array[M]], Int) => (V, Array[M])
): RDD[(K, V)]
/**
* Run with DefaultCombiner, HashPartitioner, and custom storage
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
sc: SparkContext,
vertices: RDD[(K, V)],
messages: RDD[(K, M)],
numPartitions: Int,
storageLevel: StorageLevel
)(
compute: (V, Option[Array[M]], Int) => (V, Array[M])
): RDD[(K, V)]
}Interface for combining multiple messages to the same vertex to reduce network communication overhead.
/**
* Trait for combining messages to reduce network traffic
* @tparam M Original message type
* @tparam C Combined message type
*/
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Combiner[M, C] {
/** Create initial combiner from single message */
def createCombiner(msg: M): C
/** Merge message into existing combiner */
def mergeMsg(combiner: C, msg: M): C
/** Merge two combiners together */
def mergeCombiners(a: C, b: C): C
}
/**
* Default combiner that appends messages without actual combining
* @tparam M Message type
*/
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
def createCombiner(msg: M): Array[M]
def mergeMsg(combiner: Array[M], msg: M): Array[M]
def mergeCombiners(a: Array[M], b: Array[M]): Array[M]
}Interface for performing reduce operations across all vertices after each superstep.
/**
* Trait for aggregating values across all vertices per superstep
* @tparam V Vertex type
* @tparam A Aggregated value type
*/
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Aggregator[V, A] {
/** Create aggregator from single vertex */
def createAggregator(vert: V): A
/** Merge two aggregators together */
def mergeAggregators(a: A, b: A): A
}Core abstractions for representing vertices and messages in the graph computation model.
/**
* Base trait for graph vertices
* All user vertex classes must extend this trait
*/
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Vertex {
/** Whether vertex should continue computation in next superstep */
def active: Boolean
}
/**
* Base trait for messages sent between vertices
* @tparam K Key type for vertex identification
*/
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
trait Message[K] {
/** ID of destination vertex for this message */
def targetId: K
}/** Default storage level for intermediate RDDs (MEMORY_AND_DISK) */
val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISKBagel operations can throw standard Spark exceptions:
All user-defined vertex, message, combiner, and aggregator classes must extend Serializable (or java.io.Serializable) for distributed execution.
Users should migrate to GraphX: Bagel is deprecated and GraphX provides superior performance, more graph algorithms, and better integration with Spark's ecosystem. See the GraphX Programming Guide for migration guidance.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-bagel-2-10