or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-graph-api.mdgraph-algorithms.mdindex.mdpregel-api.mdutilities.md

pregel-api.mddocs/

0

# Pregel API

1

2

Vertex-centric programming framework for implementing custom iterative graph algorithms using the Pregel computational model. The Pregel API enables distributed graph computation through a message-passing paradigm where vertices receive messages, update their state, and send messages to neighbors.

3

4

## Capabilities

5

6

### Core Pregel Framework

7

8

The main Pregel computation API for implementing vertex-centric iterative algorithms.

9

10

```scala { .api }

11

/**

12

* Execute a Pregel computation on the graph

13

* @param initialMsg Initial message sent to all vertices in first iteration

14

* @param maxIterations Maximum number of iterations (default: no limit)

15

* @param activeDirection Edge direction for active vertices (default: Either)

16

* @param vprog Vertex program: (VertexId, VertexData, Message) => NewVertexData

17

* @param sendMsg Send message function: EdgeTriplet => Iterator[(VertexId, Message)]

18

* @param mergeMsg Merge messages function: (Message, Message) => Message

19

* @returns New graph with updated vertex attributes

20

*/

21

def pregel[A: ClassTag](

22

initialMsg: A,

23

maxIterations: Int = Int.MaxValue,

24

activeDirection: EdgeDirection = EdgeDirection.Either

25

)(

26

vprog: (VertexId, VD, A) => VD,

27

sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

28

mergeMsg: (A, A) => A

29

): Graph[VD, ED]

30

31

object Pregel {

32

/**

33

* Execute Pregel computation (standalone object version)

34

* @param graph Input graph

35

* @param initialMsg Initial message for all vertices

36

* @param maxIterations Maximum iterations

37

* @param activeDirection Edge direction for message passing

38

* @param vprog Vertex program function

39

* @param sendMsg Message sending function

40

* @param mergeMsg Message merging function

41

* @returns Updated graph

42

*/

43

def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](

44

graph: Graph[VD, ED],

45

initialMsg: A,

46

maxIterations: Int = Int.MaxValue,

47

activeDirection: EdgeDirection = EdgeDirection.Either

48

)(

49

vprog: (VertexId, VD, A) => VD,

50

sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

51

mergeMsg: (A, A) => A

52

): Graph[VD, ED]

53

}

54

```

55

56

### Edge Direction Control

57

58

Control which edges are used for message passing based on vertex activity.

59

60

```scala { .api }

61

object EdgeDirection {

62

/** Edges where the vertex is the destination */

63

val In: EdgeDirection

64

65

/** Edges where the vertex is the source */

66

val Out: EdgeDirection

67

68

/** Both incoming and outgoing edges */

69

val Either: EdgeDirection

70

71

/** Only edges where vertex is both source AND destination (self-loops) */

72

val Both: EdgeDirection

73

}

74

75

class EdgeDirection {

76

/** Reverse the direction */

77

def reverse: EdgeDirection

78

}

79

```

80

81

### Message Aggregation Integration

82

83

The Pregel API builds on the lower-level `aggregateMessages` function for message passing.

84

85

```scala { .api }

86

/**

87

* Lower-level message aggregation (used internally by Pregel)

88

* @param sendMsg Function defining messages sent along edges

89

* @param mergeMsg Function combining multiple messages at same vertex

90

* @param tripletFields Optimization hint for which triplet fields are accessed

91

* @returns VertexRDD with aggregated messages

92

*/

93

def aggregateMessages[A: ClassTag](

94

sendMsg: EdgeContext[VD, ED, A] => Unit,

95

mergeMsg: (A, A) => A,

96

tripletFields: TripletFields = TripletFields.All

97

): VertexRDD[A]

98

99

/**

100

* Context for sending messages in aggregateMessages

101

*/

102

abstract class EdgeContext[VD, ED, A] {

103

val srcId: VertexId

104

val dstId: VertexId

105

val srcAttr: VD

106

val dstAttr: VD

107

val attr: ED

108

109

/** Send message to source vertex */

110

def sendToSrc(msg: A): Unit

111

112

/** Send message to destination vertex */

113

def sendToDst(msg: A): Unit

114

}

115

```

116

117

## Pregel Algorithm Patterns

118

119

### Single Source Shortest Path (SSSP)

120

121

Classic shortest path algorithm using Pregel message passing.

122

123

```scala

124

import org.apache.spark.graphx._

125

126

def shortestPaths(graph: Graph[Long, Double], sourceId: VertexId): Graph[Double, Double] = {

127

// Initialize distances: 0 for source, infinity for others

128

val initialGraph = graph.mapVertices((id, _) =>

129

if (id == sourceId) 0.0 else Double.PositiveInfinity

130

)

131

132

// Pregel computation

133

initialGraph.pregel(Double.PositiveInfinity)(

134

// Vertex program: update distance if received shorter path

135

vprog = (id, dist, newDist) => math.min(dist, newDist),

136

137

// Send message: if distance changed, notify neighbors

138

sendMsg = triplet => {

139

if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {

140

Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))

141

} else {

142

Iterator.empty

143

}

144

},

145

146

// Merge messages: take minimum distance

147

mergeMsg = (a, b) => math.min(a, b)

148

)

149

}

150

151

// Usage

152

val sourceVertex = 1L

153

val distances = shortestPaths(graph, sourceVertex).vertices

154

distances.collect.foreach { case (id, dist) =>

155

println(s"Distance from $sourceVertex to $id: $dist")

156

}

157

```

158

159

### Connected Components with Pregel

160

161

Find connected components using iterative label propagation.

162

163

```scala

164

def connectedComponents[ED: ClassTag](graph: Graph[Long, ED]): Graph[VertexId, ED] = {

165

// Initialize each vertex with its own ID as component label

166

val initialGraph = graph.mapVertices((id, _) => id)

167

168

initialGraph.pregel(Long.MaxValue)(

169

// Vertex program: adopt smaller component ID

170

vprog = (id, oldLabel, newLabel) => math.min(oldLabel, newLabel),

171

172

// Send message: propagate smallest seen component ID

173

sendMsg = triplet => {

174

val messages = mutable.ListBuffer[(VertexId, VertexId)]()

175

176

if (triplet.srcAttr < triplet.dstAttr) {

177

messages += ((triplet.dstId, triplet.srcAttr))

178

}

179

if (triplet.dstAttr < triplet.srcAttr) {

180

messages += ((triplet.srcId, triplet.dstAttr))

181

}

182

183

messages.toIterator

184

},

185

186

// Merge messages: take minimum component ID

187

mergeMsg = (a, b) => math.min(a, b)

188

)

189

}

190

```

191

192

### PageRank with Pregel

193

194

Implement PageRank algorithm using the Pregel framework.

195

196

```scala

197

def pageRank(graph: Graph[Double, Double], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = {

198

// Initialize all vertices with rank 1.0

199

val initialGraph = graph.mapVertices((_, _) => 1.0)

200

201

// Get out-degrees for each vertex

202

val outDegrees = graph.outDegrees

203

val graphWithDegrees = initialGraph.outerJoinVertices(outDegrees)((id, rank, degOpt) =>

204

(rank, degOpt.getOrElse(0))

205

)

206

207

// Run Pregel for fixed iterations

208

graphWithDegrees.pregel((0.0, 0), numIter)(

209

// Vertex program: update PageRank score

210

vprog = (id, attr, msgSum) => {

211

val (oldRank, outDegree) = attr

212

val newRank = resetProb + (1.0 - resetProb) * msgSum

213

(newRank, outDegree)

214

},

215

216

// Send message: send rank contribution to neighbors

217

sendMsg = triplet => {

218

val (srcRank, srcOutDegree) = triplet.srcAttr

219

if (srcOutDegree > 0) {

220

Iterator((triplet.dstId, srcRank / srcOutDegree))

221

} else {

222

Iterator.empty

223

}

224

},

225

226

// Merge messages: sum all incoming rank contributions

227

mergeMsg = (a, b) => a + b

228

).mapVertices((id, attr) => attr._1) // Extract just the rank

229

}

230

```

231

232

### Collaborative Filtering with Pregel

233

234

Matrix factorization using alternating least squares implemented with Pregel.

235

236

```scala

237

case class Factor(features: Array[Double], bias: Double)

238

239

def alternatingLeastSquares(

240

graph: Graph[Double, Double], // ratings graph

241

rank: Int,

242

numIter: Int

243

): Graph[Factor, Double] = {

244

245

import scala.util.Random

246

val random = new Random(42)

247

248

// Initialize vertex features randomly

249

val initialGraph = graph.mapVertices { (id, _) =>

250

Factor(Array.fill(rank)(random.nextGaussian() * 0.1), 0.0)

251

}

252

253

// Alternate between updating user and item factors

254

var currentGraph = initialGraph

255

256

for (iter <- 0 until numIter) {

257

// Update user factors (vertices with ID < some threshold)

258

currentGraph = currentGraph.pregel(Factor(Array.empty, 0.0))(

259

vprog = (id, oldFactor, newFactor) => {

260

if (id < 1000000 && newFactor.features.nonEmpty) newFactor else oldFactor

261

},

262

263

sendMsg = triplet => {

264

val rating = triplet.attr

265

// Send item factors to users, user factors to items

266

if (triplet.srcId < 1000000) { // User vertex

267

Iterator((triplet.srcId, triplet.dstAttr)) // Send item factor to user

268

} else {

269

Iterator((triplet.dstId, triplet.srcAttr)) // Send user factor to item

270

}

271

},

272

273

mergeMsg = (f1, f2) => f1 // Simple merge (would need proper ALS update)

274

)

275

276

// Update item factors (similar pattern)

277

// ... item factor update iteration ...

278

}

279

280

currentGraph

281

}

282

```

283

284

## Advanced Pregel Patterns

285

286

### Multi-Phase Algorithms

287

288

Some algorithms require multiple Pregel phases with different logic.

289

290

```scala

291

def twoPhaseAlgorithm[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VD, ED] = {

292

// Phase 1: Forward pass

293

val phase1Result = graph.pregel(initialMsg1)(vprog1, sendMsg1, mergeMsg1)

294

295

// Phase 2: Backward pass with different logic

296

val phase2Result = phase1Result.pregel(initialMsg2)(vprog2, sendMsg2, mergeMsg2)

297

298

phase2Result

299

}

300

```

301

302

### Convergence Detection

303

304

Implement custom convergence checking within Pregel iterations.

305

306

```scala

307

def convergedPregelAlgorithm[VD: ClassTag, ED: ClassTag](

308

graph: Graph[VD, ED],

309

tolerance: Double

310

): Graph[VD, ED] = {

311

312

var currentGraph = graph

313

var converged = false

314

var iteration = 0

315

316

while (!converged && iteration < 100) {

317

val previousGraph = currentGraph

318

319

currentGraph = currentGraph.pregel(initialMsg)(

320

vprog = (id, oldAttr, msg) => {

321

// Update logic that tracks changes

322

val newAttr = updateFunction(oldAttr, msg)

323

newAttr

324

},

325

sendMsg = sendFunction,

326

mergeMsg = mergeFunction

327

)

328

329

// Check convergence by comparing vertex attributes

330

val maxChange = previousGraph.vertices

331

.join(currentGraph.vertices)

332

.map { case (id, (oldAttr, newAttr)) =>

333

computeChange(oldAttr, newAttr)

334

}

335

.max()

336

337

converged = maxChange < tolerance

338

previousGraph.unpersist(blocking = false)

339

iteration += 1

340

}

341

342

currentGraph

343

}

344

```

345

346

### Performance Optimization Patterns

347

348

```scala

349

// Optimize Pregel with proper caching and partitioning

350

def optimizedPregelAlgorithm[VD: ClassTag, ED: ClassTag](

351

graph: Graph[VD, ED]

352

): Graph[VD, ED] = {

353

354

val optimizedGraph = graph

355

.partitionBy(PartitionStrategy.EdgePartition2D) // Better partitioning

356

.cache() // Cache for iterations

357

358

val result = optimizedGraph.pregel(

359

initialMsg = initialMessage,

360

maxIterations = 50 // Prevent infinite loops

361

)(

362

vprog = vertexProgram,

363

sendMsg = messageFunction,

364

mergeMsg = mergeFunction

365

)

366

367

// Clean up

368

optimizedGraph.unpersist(blocking = false)

369

result.cache() // Cache result if it will be reused

370

}

371

372

// Use TripletFields for better performance

373

def efficientMessagePassing[VD: ClassTag, ED: ClassTag](

374

graph: Graph[VD, ED]

375

): VertexRDD[Double] = {

376

377

graph.aggregateMessages[Double](

378

sendMsg = ctx => {

379

// Only access needed fields

380

ctx.sendToDst(ctx.srcAttr)

381

},

382

mergeMsg = (a, b) => a + b,

383

tripletFields = TripletFields.Src // Only need source attributes

384

)

385

}

386

```

387

388

## Pregel vs. Other GraphX Operations

389

390

### When to Use Pregel

391

392

- **Iterative algorithms** that require multiple passes through the graph

393

- **Custom algorithms** not available in the GraphX library

394

- **Vertex-centric computation** where logic is naturally expressed per-vertex

395

- **Message-passing patterns** where vertices communicate with neighbors

396

397

### When to Use Alternatives

398

399

```scala

400

// Use aggregateMessages for single-pass message aggregation

401

val degrees = graph.aggregateMessages[Int](

402

sendMsg = ctx => { ctx.sendToSrc(1); ctx.sendToDst(1) },

403

mergeMsg = (a, b) => a + b

404

)

405

406

// Use mapVertices/mapTriplets for simple transformations

407

val normalizedGraph = graph.mapVertices((id, attr) => attr / maxValue)

408

409

// Use GraphOps methods for common operations

410

val components = graph.connectedComponents() // More efficient than custom Pregel

411

val pageRanks = graph.pageRank(0.001) // Optimized implementation

412

```

413

414

### Pregel Execution Model

415

416

```scala

417

// Pregel execution phases in each iteration:

418

// 1. Vertex Program: Update vertex state based on received messages

419

// 2. Send Messages: Generate messages for next iteration

420

// 3. Message Aggregation: Combine multiple messages to same vertex

421

// 4. Check Convergence: Determine if more iterations needed

422

423

def pregelIteration[VD, ED, A](

424

graph: Graph[VD, ED],

425

messages: VertexRDD[A],

426

vprog: (VertexId, VD, A) => VD,

427

sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

428

mergeMsg: (A, A) => A

429

): (Graph[VD, ED], VertexRDD[A]) = {

430

431

// Phase 1: Apply vertex program

432

val newVertices = graph.vertices.leftJoin(messages)(vprog)

433

val newGraph = Graph(newVertices, graph.edges)

434

435

// Phase 2: Send messages for next iteration

436

val newMessages = newGraph.aggregateMessages(

437

sendMsg = ctx => sendMsg(ctx.toEdgeTriplet).foreach {

438

case (vid, msg) => if (vid == ctx.srcId) ctx.sendToSrc(msg) else ctx.sendToDst(msg)

439

},

440

mergeMsg = mergeMsg

441

)

442

443

(newGraph, newMessages)

444

}

445

```