or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-spark--spark-bagel_2-10

Bagel is a Spark implementation of Google's Pregel graph processing framework, deprecated and superseded by GraphX

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-bagel_2.10@1.6.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-bagel_2-10@1.6.0

0

# Apache Spark Bagel

1

2

**⚠️ DEPRECATED**: Bagel is deprecated as of Spark 1.6.0 and superseded by [GraphX](https://spark.apache.org/docs/latest/graphx-programming-guide.html). This documentation is provided for legacy compatibility only.

3

4

Bagel is a Spark implementation of Google's [Pregel](http://portal.acm.org/citation.cfm?id=1807184) graph processing framework. It provides a distributed vertex-centric programming model for large-scale graph computation using iterative message passing between vertices in supersteps.

5

6

## Package Information

7

8

- **Package Name**: spark-bagel_2.10

9

- **Package Type**: maven

10

- **Language**: Scala

11

- **Group ID**: org.apache.spark

12

- **Artifact ID**: spark-bagel_2.10

13

- **Version**: 1.6.3

14

- **Installation**: Add Maven/SBT dependency for `org.apache.spark:spark-bagel_2.10:1.6.3`

15

16

## Core Imports

17

18

```scala

19

import org.apache.spark.bagel._

20

import org.apache.spark.bagel.Bagel._

21

```

22

23

For specific components:

24

25

```scala

26

import org.apache.spark.bagel.{Bagel, Vertex, Message, Combiner, Aggregator}

27

```

28

29

For complete usage with Spark components:

30

31

```scala

32

import org.apache.spark.{SparkContext, HashPartitioner, Partitioner}

33

import org.apache.spark.rdd.RDD

34

import org.apache.spark.storage.StorageLevel

35

import scala.reflect.Manifest

36

```

37

38

## Basic Usage

39

40

```scala

41

import org.apache.spark.SparkContext

42

import org.apache.spark.bagel._

43

import org.apache.spark.bagel.Bagel._

44

45

// Define custom vertex class

46

class PageRankVertex(

47

val id: String,

48

val rank: Double,

49

val outEdges: Seq[String],

50

val active: Boolean

51

) extends Vertex with Serializable

52

53

// Define custom message class

54

class PageRankMessage(

55

val targetId: String,

56

val rankShare: Double

57

) extends Message[String] with Serializable

58

59

// Load graph data

60

val vertices = sc.parallelize(Array(

61

("A", new PageRankVertex("A", 1.0, Seq("B", "C"), true)),

62

("B", new PageRankVertex("B", 1.0, Seq("C"), true)),

63

("C", new PageRankVertex("C", 1.0, Seq("A"), true))

64

))

65

66

val messages = sc.parallelize(Array[(String, PageRankMessage)]())

67

68

// Define compute function

69

def compute(vertex: PageRankVertex, msgs: Option[Array[PageRankMessage]], superstep: Int) = {

70

val msgSum = msgs.getOrElse(Array()).map(_.rankShare).sum

71

val newRank = if (msgSum != 0) 0.15 + 0.85 * msgSum else vertex.rank

72

val halt = superstep >= 10

73

74

val outMsgs = if (!halt) {

75

vertex.outEdges.map(targetId =>

76

new PageRankMessage(targetId, newRank / vertex.outEdges.size)

77

).toArray

78

} else Array()

79

80

(new PageRankVertex(vertex.id, newRank, vertex.outEdges, !halt), outMsgs)

81

}

82

83

// Run Bagel program

84

val result = Bagel.run(sc, vertices, messages, 3)(compute)

85

```

86

87

## Architecture

88

89

Bagel implements the Pregel computational model with these key components:

90

91

- **Vertex-Centric Model**: Computation focuses on individual vertices rather than edges

92

- **Superstep Iterations**: Program executes in synchronized iterations called supersteps

93

- **Message Passing**: Vertices communicate only through messages sent to other vertices

94

- **Combiners**: Optional message aggregation to reduce network traffic

95

- **Aggregators**: Global reduce operations across all vertices per superstep

96

- **Distributed Storage**: Uses Spark RDDs for distributed vertex and message storage

97

98

## Capabilities

99

100

### Graph Computation

101

102

Core Bagel computation engine that executes Pregel-style vertex programs with iterative message passing.

103

104

```scala { .api }

105

@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")

106

object Bagel {

107

/**

108

* Runs a Bagel program with full feature support

109

* @param sc SparkContext for distributed execution

110

* @param vertices Initial vertex state as RDD of (K, V) pairs

111

* @param messages Initial messages as RDD of (K, M) pairs

112

* @param combiner Message combiner for reducing network traffic

113

* @param aggregator Optional global aggregator across vertices

114

* @param partitioner Partitioning strategy for distributed data

115

* @param numPartitions Number of partitions for graph data

116

* @param storageLevel Storage level for intermediate RDDs

117

* @param compute User-defined vertex computation function

118

* @return Final vertex states after convergence

119

*/

120

def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest, A: Manifest](

121

sc: SparkContext,

122

vertices: RDD[(K, V)],

123

messages: RDD[(K, M)],

124

combiner: Combiner[M, C],

125

aggregator: Option[Aggregator[V, A]],

126

partitioner: Partitioner,

127

numPartitions: Int,

128

storageLevel: StorageLevel = DEFAULT_STORAGE_LEVEL

129

)(

130

compute: (V, Option[C], Option[A], Int) => (V, Array[M])

131

): RDD[(K, V)]

132

133

/**

134

* Simplified run without aggregator and default storage

135

* @param compute User function taking (vertex, combinedMessages, superstep)

136

*/

137

def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](

138

sc: SparkContext,

139

vertices: RDD[(K, V)],

140

messages: RDD[(K, M)],

141

combiner: Combiner[M, C],

142

partitioner: Partitioner,

143

numPartitions: Int

144

)(

145

compute: (V, Option[C], Int) => (V, Array[M])

146

): RDD[(K, V)]

147

148

/**

149

* Run with custom storage level, no aggregator

150

* @param storageLevel RDD caching strategy for intermediate results

151

*/

152

def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](

153

sc: SparkContext,

154

vertices: RDD[(K, V)],

155

messages: RDD[(K, M)],

156

combiner: Combiner[M, C],

157

partitioner: Partitioner,

158

numPartitions: Int,

159

storageLevel: StorageLevel

160

)(

161

compute: (V, Option[C], Int) => (V, Array[M])

162

): RDD[(K, V)]

163

164

/**

165

* Run with default HashPartitioner, no aggregator

166

*/

167

def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](

168

sc: SparkContext,

169

vertices: RDD[(K, V)],

170

messages: RDD[(K, M)],

171

combiner: Combiner[M, C],

172

numPartitions: Int

173

)(

174

compute: (V, Option[C], Int) => (V, Array[M])

175

): RDD[(K, V)]

176

177

/**

178

* Run with default HashPartitioner and custom storage

179

*/

180

def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](

181

sc: SparkContext,

182

vertices: RDD[(K, V)],

183

messages: RDD[(K, M)],

184

combiner: Combiner[M, C],

185

numPartitions: Int,

186

storageLevel: StorageLevel

187

)(

188

compute: (V, Option[C], Int) => (V, Array[M])

189

): RDD[(K, V)]

190

191

/**

192

* Run with DefaultCombiner, HashPartitioner, and default storage

193

*/

194

def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](

195

sc: SparkContext,

196

vertices: RDD[(K, V)],

197

messages: RDD[(K, M)],

198

numPartitions: Int

199

)(

200

compute: (V, Option[Array[M]], Int) => (V, Array[M])

201

): RDD[(K, V)]

202

203

/**

204

* Run with DefaultCombiner, HashPartitioner, and custom storage

205

*/

206

def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](

207

sc: SparkContext,

208

vertices: RDD[(K, V)],

209

messages: RDD[(K, M)],

210

numPartitions: Int,

211

storageLevel: StorageLevel

212

)(

213

compute: (V, Option[Array[M]], Int) => (V, Array[M])

214

): RDD[(K, V)]

215

216

}

217

```

218

219

### Message Combining

220

221

Interface for combining multiple messages to the same vertex to reduce network communication overhead.

222

223

```scala { .api }

224

/**

225

* Trait for combining messages to reduce network traffic

226

* @tparam M Original message type

227

* @tparam C Combined message type

228

*/

229

@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")

230

trait Combiner[M, C] {

231

/** Create initial combiner from single message */

232

def createCombiner(msg: M): C

233

234

/** Merge message into existing combiner */

235

def mergeMsg(combiner: C, msg: M): C

236

237

/** Merge two combiners together */

238

def mergeCombiners(a: C, b: C): C

239

}

240

241

/**

242

* Default combiner that appends messages without actual combining

243

* @tparam M Message type

244

*/

245

@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")

246

class DefaultCombiner[M: Manifest] extends Combiner[M, Array[M]] with Serializable {

247

def createCombiner(msg: M): Array[M]

248

def mergeMsg(combiner: Array[M], msg: M): Array[M]

249

def mergeCombiners(a: Array[M], b: Array[M]): Array[M]

250

}

251

```

252

253

### Global Aggregation

254

255

Interface for performing reduce operations across all vertices after each superstep.

256

257

```scala { .api }

258

/**

259

* Trait for aggregating values across all vertices per superstep

260

* @tparam V Vertex type

261

* @tparam A Aggregated value type

262

*/

263

@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")

264

trait Aggregator[V, A] {

265

/** Create aggregator from single vertex */

266

def createAggregator(vert: V): A

267

268

/** Merge two aggregators together */

269

def mergeAggregators(a: A, b: A): A

270

}

271

```

272

273

### Graph Data Types

274

275

Core abstractions for representing vertices and messages in the graph computation model.

276

277

```scala { .api }

278

/**

279

* Base trait for graph vertices

280

* All user vertex classes must extend this trait

281

*/

282

@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")

283

trait Vertex {

284

/** Whether vertex should continue computation in next superstep */

285

def active: Boolean

286

}

287

288

/**

289

* Base trait for messages sent between vertices

290

* @tparam K Key type for vertex identification

291

*/

292

@deprecated("Uses of Bagel should migrate to GraphX", "1.6.0")

293

trait Message[K] {

294

/** ID of destination vertex for this message */

295

def targetId: K

296

}

297

```

298

299

### Constants

300

301

```scala { .api }

302

/** Default storage level for intermediate RDDs (MEMORY_AND_DISK) */

303

val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK

304

```

305

306

## Type Parameters

307

308

- **K**: Key type for vertex identification (must have Manifest)

309

- **V**: Vertex type (must extend Vertex and have Manifest)

310

- **M**: Message type (must extend Message[K] and have Manifest)

311

- **C**: Combined message type (must have Manifest)

312

- **A**: Aggregated value type (must have Manifest)

313

314

## Error Handling

315

316

Bagel operations can throw standard Spark exceptions:

317

318

- **SparkException**: For general Spark execution failures

319

- **ClassCastException**: For type mismatches in user-defined functions

320

- **SerializationException**: When user classes are not properly serializable

321

322

All user-defined vertex, message, combiner, and aggregator classes must extend `Serializable` (or `java.io.Serializable`) for distributed execution.

323

324

## Migration Note

325

326

**Users should migrate to GraphX**: Bagel is deprecated and GraphX provides superior performance, more graph algorithms, and better integration with Spark's ecosystem. See the [GraphX Programming Guide](https://spark.apache.org/docs/latest/graphx-programming-guide.html) for migration guidance.