0
# Apache Spark Bagel
1
2
**⚠️ DEPRECATED**: Bagel is deprecated as of Spark 1.6.0 and superseded by [GraphX](https://spark.apache.org/docs/latest/graphx-programming-guide.html). This documentation is provided for legacy compatibility only.
3
4
Bagel is a Spark implementation of Google's [Pregel](http://portal.acm.org/citation.cfm?id=1807184) graph processing framework. It provides a distributed vertex-centric programming model for large-scale graph computation using iterative message passing between vertices in supersteps.
5
6
## Package Information
7
8
- **Package Name**: spark-bagel_2.10
9
- **Package Type**: maven
10
- **Language**: Scala
11
- **Group ID**: org.apache.spark
12
- **Artifact ID**: spark-bagel_2.10
13
- **Version**: 1.6.3
14
- **Installation**: Add Maven/SBT dependency for `org.apache.spark:spark-bagel_2.10:1.6.3`
15
16
## Core Imports
17
18
```scala
19
import org.apache.spark.bagel._
20
import org.apache.spark.bagel.Bagel._
21
```
22
23
For specific components:
24
25
```scala
26
import org.apache.spark.bagel.{Bagel, Vertex, Message, Combiner, Aggregator}
27
```
28
29
For complete usage with Spark components:
30
31
```scala
32
import org.apache.spark.{SparkContext, HashPartitioner, Partitioner}
33
import org.apache.spark.rdd.RDD
34
import org.apache.spark.storage.StorageLevel
35
import scala.reflect.Manifest
36
```
37
38
## Basic Usage
39
40
```scala
41
import org.apache.spark.SparkContext
42
import org.apache.spark.bagel._
43
import org.apache.spark.bagel.Bagel._
44
45
// Define custom vertex class
46
class PageRankVertex(
47
val id: String,
48
val rank: Double,
49
val outEdges: Seq[String],
50
val active: Boolean
51
) extends Vertex with Serializable
52
53
// Define custom message class
54
class PageRankMessage(
55
val targetId: String,
56
val rankShare: Double
57
) extends Message[String] with Serializable
58
59
// Load graph data
60
val vertices = sc.parallelize(Array(
61
("A", new PageRankVertex("A", 1.0, Seq("B", "C"), true)),
62
("B", new PageRankVertex("B", 1.0, Seq("C"), true)),
63
("C", new PageRankVertex("C", 1.0, Seq("A"), true))
64
))
65
66
val messages = sc.parallelize(Array[(String, PageRankMessage)]())
67
68
// Define compute function
69
def compute(vertex: PageRankVertex, msgs: Option[Array[PageRankMessage]], superstep: Int) = {
70
val msgSum = msgs.getOrElse(Array()).map(_.rankShare).sum
71
val newRank = if (msgSum != 0) 0.15 + 0.85 * msgSum else vertex.rank
72
val halt = superstep >= 10
73
74
val outMsgs = if (!halt) {
75
vertex.outEdges.map(targetId =>
76
new PageRankMessage(targetId, newRank / vertex.outEdges.size)
77
).toArray
78
} else Array()
79
80
(new PageRankVertex(vertex.id, newRank, vertex.outEdges, !halt), outMsgs)
81
}
82
83
// Run Bagel program
84
val result = Bagel.run(sc, vertices, messages, 3)(compute)
85
```
86
87
## Architecture
88
89
Bagel implements the Pregel computational model with these key components:
90
91
- **Vertex-Centric Model**: Computation focuses on individual vertices rather than edges
92
- **Superstep Iterations**: Program executes in synchronized iterations called supersteps
93
- **Message Passing**: Vertices communicate only through messages sent to other vertices
94
- **Combiners**: Optional message aggregation to reduce network traffic
95
- **Aggregators**: Global reduce operations across all vertices per superstep
96
- **Distributed Storage**: Uses Spark RDDs for distributed vertex and message storage
97
98
## Capabilities
99
100
### Graph Computation
101
102
Core Bagel computation engine that executes Pregel-style vertex programs with iterative message passing.
103
104
```scala { .api }
105
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
106
object Bagel {
107
/**
108
* Runs a Bagel program with full feature support
109
* @param sc SparkContext for distributed execution
110
* @param vertices Initial vertex state as RDD of (K, V) pairs
111
* @param messages Initial messages as RDD of (K, M) pairs
112
* @param combiner Message combiner for reducing network traffic
113
* @param aggregator Optional global aggregator across vertices
114
* @param partitioner Partitioning strategy for distributed data
115
* @param numPartitions Number of partitions for graph data
116
* @param storageLevel Storage level for intermediate RDDs
117
* @param compute User-defined vertex computation function
118
* @return Final vertex states after convergence
119
*/
120
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest, A: Manifest](
121
sc: SparkContext,
122
vertices: RDD[(K, V)],
123
messages: RDD[(K, M)],
124
combiner: Combiner[M, C],
125
aggregator: Option[Aggregator[V, A]],
126
partitioner: Partitioner,
127
numPartitions: Int,
128
storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL
129
)(
130
compute: (V, Option[C], Option[A], Int) => (V, Array[M])
131
): RDD[(K, V)]
132
133
/**
134
* Simplified run without aggregator and default storage
135
* @param compute User function taking (vertex, combinedMessages, superstep)
136
*/
137
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
138
sc: SparkContext,
139
vertices: RDD[(K, V)],
140
messages: RDD[(K, M)],
141
combiner: Combiner[M, C],
142
partitioner: Partitioner,
143
numPartitions: Int
144
)(
145
compute: (V, Option[C], Int) => (V, Array[M])
146
): RDD[(K, V)]
147
148
/**
149
* Run with custom storage level, no aggregator
150
* @param storageLevel RDD caching strategy for intermediate results
151
*/
152
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
153
sc: SparkContext,
154
vertices: RDD[(K, V)],
155
messages: RDD[(K, M)],
156
combiner: Combiner[M, C],
157
partitioner: Partitioner,
158
numPartitions: Int,
159
storageLevel: StorageLevel
160
)(
161
compute: (V, Option[C], Int) => (V, Array[M])
162
): RDD[(K, V)]
163
164
/**
165
* Run with default HashPartitioner, no aggregator
166
*/
167
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
168
sc: SparkContext,
169
vertices: RDD[(K, V)],
170
messages: RDD[(K, M)],
171
combiner: Combiner[M, C],
172
numPartitions: Int
173
)(
174
compute: (V, Option[C], Int) => (V, Array[M])
175
): RDD[(K, V)]
176
177
/**
178
* Run with default HashPartitioner and custom storage
179
*/
180
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
181
sc: SparkContext,
182
vertices: RDD[(K, V)],
183
messages: RDD[(K, M)],
184
combiner: Combiner[M, C],
185
numPartitions: Int,
186
storageLevel: StorageLevel
187
)(
188
compute: (V, Option[C], Int) => (V, Array[M])
189
): RDD[(K, V)]
190
191
/**
192
* Run with DefaultCombiner, HashPartitioner, and default storage
193
*/
194
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
195
sc: SparkContext,
196
vertices: RDD[(K, V)],
197
messages: RDD[(K, M)],
198
numPartitions: Int
199
)(
200
compute: (V, Option[Array[M]], Int) => (V, Array[M])
201
): RDD[(K, V)]
202
203
/**
204
* Run with DefaultCombiner, HashPartitioner, and custom storage
205
*/
206
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
207
sc: SparkContext,
208
vertices: RDD[(K, V)],
209
messages: RDD[(K, M)],
210
numPartitions: Int,
211
storageLevel: StorageLevel
212
)(
213
compute: (V, Option[Array[M]], Int) => (V, Array[M])
214
): RDD[(K, V)]
215
216
}
217
```
218
219
### Message Combining
220
221
Interface for combining multiple messages to the same vertex to reduce network communication overhead.
222
223
```scala { .api }
224
/**
225
* Trait for combining messages to reduce network traffic
226
* @tparam M Original message type
227
* @tparam C Combined message type
228
*/
229
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
230
trait Combiner[M, C] {
231
/** Create initial combiner from single message */
232
def createCombiner(msg: M): C
233
234
/** Merge message into existing combiner */
235
def mergeMsg(combiner: C, msg: M): C
236
237
/** Merge two combiners together */
238
def mergeCombiners(a: C, b: C): C
239
}
240
241
/**
242
* Default combiner that appends messages without actual combining
243
* @tparam M Message type
244
*/
245
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
246
class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {
247
def createCombiner(msg: M): Array[M]
248
def mergeMsg(combiner: Array[M], msg: M): Array[M]
249
def mergeCombiners(a: Array[M], b: Array[M]): Array[M]
250
}
251
```
252
253
### Global Aggregation
254
255
Interface for performing reduce operations across all vertices after each superstep.
256
257
```scala { .api }
258
/**
259
* Trait for aggregating values across all vertices per superstep
260
* @tparam V Vertex type
261
* @tparam A Aggregated value type
262
*/
263
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
264
trait Aggregator[V, A] {
265
/** Create aggregator from single vertex */
266
def createAggregator(vert: V): A
267
268
/** Merge two aggregators together */
269
def mergeAggregators(a: A, b: A): A
270
}
271
```
272
273
### Graph Data Types
274
275
Core abstractions for representing vertices and messages in the graph computation model.
276
277
```scala { .api }
278
/**
279
* Base trait for graph vertices
280
* All user vertex classes must extend this trait
281
*/
282
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
283
trait Vertex {
284
/** Whether vertex should continue computation in next superstep */
285
def active: Boolean
286
}
287
288
/**
289
* Base trait for messages sent between vertices
290
* @tparam K Key type for vertex identification
291
*/
292
@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")
293
trait Message[K] {
294
/** ID of destination vertex for this message */
295
def targetId: K
296
}
297
```
298
299
### Constants
300
301
```scala { .api }
302
/** Default storage level for intermediate RDDs (MEMORY_AND_DISK) */
303
val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK
304
```
305
306
## Type Parameters
307
308
- **K**: Key type for vertex identification (must have Manifest)
309
- **V**: Vertex type (must extend Vertex and have Manifest)
310
- **M**: Message type (must extend Message[K] and have Manifest)
311
- **C**: Combined message type (must have Manifest)
312
- **A**: Aggregated value type (must have Manifest)
313
314
## Error Handling
315
316
Bagel operations can throw standard Spark exceptions:
317
318
- **SparkException**: For general Spark execution failures
319
- **ClassCastException**: For type mismatches in user-defined functions
320
- **SerializationException**: When user classes are not properly serializable
321
322
All user-defined vertex, message, combiner, and aggregator classes must extend `Serializable` (or `java.io.Serializable`) for distributed execution.
323
324
## Migration Note
325
326
**Users should migrate to GraphX**: Bagel is deprecated and GraphX provides superior performance, more graph algorithms, and better integration with Spark's ecosystem. See the [GraphX Programming Guide](https://spark.apache.org/docs/latest/graphx-programming-guide.html) for migration guidance.