or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-graph-operations.mdgraph-algorithms.mdgraph-analytics.mdindex.mdloading-utilities.mdmessage-passing.mdrdd-abstractions.md

message-passing.mddocs/

0

# Message Passing and Pregel

1

2

Bulk synchronous message passing framework for implementing custom graph algorithms using vertex-centric programming model with efficient distributed computation.

3

4

## Capabilities

5

6

### Core Message Aggregation

7

8

The fundamental message passing operation that enables vertex-centric computation patterns.

9

10

```scala { .api }

11

/**

12

* Aggregate messages sent along edges to compute new vertex values

13

* Core operation for implementing graph algorithms using message passing

14

* @param sendMsg Function defining what messages to send along each edge

15

* @param mergeMsg Function to combine multiple messages received at same vertex

16

* @param tripletFields Optional field specification for performance optimization

17

* @return VertexRDD containing aggregated messages for each vertex

18

*/

19

def aggregateMessages[A: ClassTag](

20

sendMsg: EdgeContext[VD, ED, A] => Unit,

21

mergeMsg: (A, A) => A,

22

tripletFields: TripletFields = TripletFields.All

23

): VertexRDD[A]

24

```

25

26

**Usage Examples:**

27

28

```scala

29

import org.apache.spark.graphx._

30

31

// Count neighbors for each vertex

32

val neighborCounts = graph.aggregateMessages[Int](

33

// Send message function - send 1 to both source and destination

34

triplet => {

35

triplet.sendToSrc(1)

36

triplet.sendToDst(1)

37

},

38

// Merge function - sum all incoming messages

39

(a, b) => a + b

40

)

41

42

// Collect neighbor attributes

43

val neighborAttributes = graph.aggregateMessages[List[String]](

44

triplet => {

45

// Send destination attribute to source, source attribute to destination

46

triplet.sendToSrc(List(triplet.dstAttr))

47

triplet.sendToDst(List(triplet.srcAttr))

48

},

49

// Merge by concatenating lists

50

(a, b) => a ++ b

51

)

52

53

// Calculate weighted sums (assuming numeric edge attributes)

54

val weightedSums = graph.aggregateMessages[Double](

55

triplet => {

56

val weight = triplet.attr.toString.toDouble

57

triplet.sendToSrc(weight)

58

triplet.sendToDst(weight)

59

},

60

(a, b) => a + b

61

)

62

```

63

64

### EdgeContext API

65

66

Context object providing access to edge and vertex data during message sending.

67

68

```scala { .api }

69

/**

70

* Context for sending messages during aggregateMessages operation

71

* Provides access to edge data and adjacent vertex attributes

72

*/

73

abstract class EdgeContext[VD, ED, A] {

74

/** Source vertex ID */

75

def srcId: VertexId

76

77

/** Destination vertex ID */

78

def dstId: VertexId

79

80

/** Source vertex attribute */

81

def srcAttr: VD

82

83

/** Destination vertex attribute */

84

def dstAttr: VD

85

86

/** Edge attribute */

87

def attr: ED

88

89

/** Send message to source vertex */

90

def sendToSrc(msg: A): Unit

91

92

/** Send message to destination vertex */

93

def sendToDst(msg: A): Unit

94

95

/** Convert to EdgeTriplet for compatibility with other APIs */

96

def toEdgeTriplet: EdgeTriplet[VD, ED]

97

}

98

```

99

100

**Usage Examples:**

101

102

```scala

103

// Complex message passing using all context data

104

val complexMessages = graph.aggregateMessages[(String, Double, Int)](

105

triplet => {

106

// Access all parts of the context

107

val edgeInfo = s"${triplet.srcAttr}-${triplet.attr}-${triplet.dstAttr}"

108

val edgeWeight = triplet.attr.toString.length.toDouble

109

val edgeCount = 1

110

111

// Send different information to each end

112

triplet.sendToSrc((s"From dst ${triplet.dstId}: $edgeInfo", edgeWeight, edgeCount))

113

triplet.sendToDst((s"From src ${triplet.srcId}: $edgeInfo", edgeWeight, edgeCount))

114

},

115

// Merge messages by combining fields

116

(a, b) => (s"${a._1}; ${b._1}", a._2 + b._2, a._3 + b._3)

117

)

118

```

119

120

### Pregel Framework

121

122

Implementation of the Pregel bulk synchronous parallel model for vertex-centric graph computation.

123

124

```scala { .api }

125

/**

126

* Pregel bulk synchronous parallel computation framework

127

* Iteratively applies vertex program and sends messages until convergence

128

*/

129

object Pregel {

130

/**

131

* Run Pregel computation

132

* @param graph Initial graph

133

* @param initialMsg Message sent to all vertices in first iteration

134

* @param maxIterations Maximum number of iterations (default unlimited)

135

* @param activeDirection Direction of edges that can send messages

136

* @param vprog Vertex program - how vertices update based on messages

137

* @param sendMsg Edge program - what messages to send along active edges

138

* @param mergeMsg Message merging function for multiple messages to same vertex

139

* @return Final graph after computation converges or max iterations reached

140

*/

141

def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](

142

graph: Graph[VD, ED],

143

initialMsg: A,

144

maxIterations: Int = Int.MaxValue,

145

activeDirection: EdgeDirection = EdgeDirection.Either

146

)(

147

vprog: (VertexId, VD, A) => VD,

148

sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

149

mergeMsg: (A, A) => A

150

): Graph[VD, ED]

151

}

152

```

153

154

**Usage Examples:**

155

156

```scala

157

// Single Source Shortest Path using Pregel

158

def shortestPaths(graph: Graph[_, Double], sourceId: VertexId): Graph[Double, Double] = {

159

// Initialize distances: 0 for source, infinity for others

160

val initialGraph = graph.mapVertices((id, _) =>

161

if (id == sourceId) 0.0 else Double.PositiveInfinity

162

)

163

164

Pregel(

165

initialGraph,

166

initialMsg = Double.PositiveInfinity,

167

maxIterations = 30,

168

activeDirection = EdgeDirection.Out

169

)(

170

// Vertex program: update distance if better path found

171

vprog = (id, dist, newDist) => math.min(dist, newDist),

172

173

// Send message: if vertex has finite distance, send distance + edge weight

174

sendMsg = triplet => {

175

if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {

176

Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))

177

} else {

178

Iterator.empty

179

}

180

},

181

182

// Merge messages: take minimum distance

183

mergeMsg = (a, b) => math.min(a, b)

184

)

185

}

186

187

// Connected Components using Pregel

188

def connectedComponents[VD: ClassTag, ED: ClassTag](

189

graph: Graph[VD, ED]

190

): Graph[VertexId, ED] = {

191

// Initialize each vertex with its own ID as component

192

val initialGraph = graph.mapVertices((id, _) => id)

193

194

Pregel(

195

initialGraph,

196

initialMsg = Long.MaxValue,

197

activeDirection = EdgeDirection.Either

198

)(

199

// Vertex program: adopt smaller component ID

200

vprog = (id, oldComp, newComp) => math.min(oldComp, newComp),

201

202

// Send message: send component ID to neighbors if it would reduce their component

203

sendMsg = triplet => {

204

val messages = mutable.ListBuffer.empty[(VertexId, VertexId)]

205

206

if (triplet.srcAttr < triplet.dstAttr) {

207

messages += ((triplet.dstId, triplet.srcAttr))

208

}

209

if (triplet.dstAttr < triplet.srcAttr) {

210

messages += ((triplet.srcId, triplet.dstAttr))

211

}

212

213

messages.iterator

214

},

215

216

// Merge messages: take minimum component ID

217

mergeMsg = (a, b) => math.min(a, b)

218

)

219

}

220

```

221

222

### Advanced Message Passing Patterns

223

224

Complex message passing patterns for sophisticated algorithms.

225

226

```scala { .api }

227

// Pattern: Multi-phase message passing

228

def multiPhaseAlgorithm[VD: ClassTag, ED: ClassTag](

229

graph: Graph[VD, ED]

230

): Graph[(VD, Map[String, Double]), ED] = {

231

232

// Phase 1: Collect local neighborhood information

233

val localInfo = graph.aggregateMessages[Map[String, Double]](

234

triplet => {

235

val info = Map("degree" -> 1.0, "edgeWeight" -> triplet.attr.toString.toDouble)

236

triplet.sendToSrc(info)

237

triplet.sendToDst(info)

238

},

239

(a, b) => a ++ b.map { case (k, v) => k -> (v + a.getOrElse(k, 0.0)) }

240

)

241

242

// Phase 2: Propagate aggregated information

243

val enrichedGraph = graph.outerJoinVertices(localInfo) { (id, attr, infoOpt) =>

244

(attr, infoOpt.getOrElse(Map.empty))

245

}

246

247

val globalInfo = enrichedGraph.aggregateMessages[Map[String, Double]](

248

triplet => {

249

// Send processed local information to neighbors

250

val processedInfo = triplet.srcAttr._2.map { case (k, v) => k -> v / 2.0 }

251

triplet.sendToDst(processedInfo)

252

},

253

(a, b) => a ++ b.map { case (k, v) => k -> (v + a.getOrElse(k, 0.0)) }

254

)

255

256

enrichedGraph.outerJoinVertices(globalInfo) { (id, (attr, local), globalOpt) =>

257

(attr, local ++ globalOpt.getOrElse(Map.empty))

258

}

259

}

260

```

261

262

### Performance Optimizations

263

264

Optimizations for message passing operations using TripletFields.

265

266

```scala { .api }

267

/**

268

* TripletFields specification for optimizing message passing performance

269

* Indicates which fields of EdgeTriplet/EdgeContext are accessed

270

*/

271

class TripletFields(val useSrc: Boolean, val useDst: Boolean, val useEdge: Boolean)

272

273

object TripletFields {

274

/** No fields accessed - most efficient */

275

val None = new TripletFields(false, false, false)

276

277

/** Only edge attribute accessed */

278

val EdgeOnly = new TripletFields(false, false, true)

279

280

/** Source vertex and edge attributes accessed */

281

val Src = new TripletFields(true, false, true)

282

283

/** Destination vertex and edge attributes accessed */

284

val Dst = new TripletFields(false, true, true)

285

286

/** All fields accessed - default but least efficient */

287

val All = new TripletFields(true, true, true)

288

}

289

```

290

291

**Usage Examples:**

292

293

```scala

294

// Optimize message passing by specifying required fields

295

val optimizedCount = graph.aggregateMessages[Int](

296

triplet => {

297

// Only need to count, don't access any attributes

298

triplet.sendToSrc(1)

299

triplet.sendToDst(1)

300

},

301

(a, b) => a + b,

302

TripletFields.None // Most efficient - no attribute access needed

303

)

304

305

val edgeWeightSum = graph.aggregateMessages[Double](

306

triplet => {

307

// Only need edge attribute for weight

308

val weight = triplet.attr.toString.toDouble

309

triplet.sendToSrc(weight)

310

},

311

(a, b) => a + b,

312

TripletFields.EdgeOnly // Only edge attributes needed

313

)

314

315

val sourceAttributeCollection = graph.aggregateMessages[List[String]](

316

triplet => {

317

// Need source attribute and edge, but not destination

318

triplet.sendToDst(List(s"${triplet.srcAttr}-${triplet.attr}"))

319

},

320

(a, b) => a ++ b,

321

TripletFields.Src // Source and edge fields needed

322

)

323

```

324

325

### Message Passing Best Practices

326

327

Patterns and practices for efficient message passing algorithms.

328

329

```scala

330

// Best Practice 1: Minimize message size

331

val efficientMessages = graph.aggregateMessages[Int](

332

triplet => {

333

// Send small messages - use IDs instead of full objects

334

triplet.sendToSrc(triplet.dstId.toInt % 1000) // Small derived value

335

},

336

(a, b) => a + b

337

)

338

339

// Best Practice 2: Use appropriate data structures for messages

340

val setMessages = graph.aggregateMessages[Set[VertexId]](

341

triplet => {

342

triplet.sendToSrc(Set(triplet.dstId))

343

},

344

(a, b) => a ++ b // Set union is efficient for deduplication

345

)

346

347

// Best Practice 3: Conditional message sending

348

val conditionalMessages = graph.aggregateMessages[Double](

349

triplet => {

350

// Only send messages when necessary

351

if (triplet.srcAttr.toString.length > triplet.dstAttr.toString.length) {

352

triplet.sendToDst(triplet.srcAttr.toString.length.toDouble)

353

}

354

},

355

(a, b) => math.max(a, b)

356

)

357

358

// Best Practice 4: Batch processing pattern

359

def iterativeMessagePassing[VD: ClassTag](

360

initialGraph: Graph[VD, _],

361

maxIter: Int

362

): Graph[VD, _] = {

363

var graph = initialGraph

364

var iteration = 0

365

366

while (iteration < maxIter) {

367

val messages = graph.aggregateMessages[VD](

368

triplet => {

369

// Algorithm-specific message sending

370

triplet.sendToSrc(triplet.dstAttr)

371

},

372

(a, b) => a // Algorithm-specific merging

373

)

374

375

// Check for convergence

376

if (messages.count() == 0) {

377

println(s"Converged after $iteration iterations")

378

return graph

379

}

380

381

// Update graph with new values

382

graph = graph.outerJoinVertices(messages) { (id, oldAttr, msgOpt) =>

383

msgOpt.getOrElse(oldAttr)

384

}

385

386

iteration += 1

387

}

388

389

graph

390

}

391

```

392

393

## Integration with GraphOps

394

395

Message passing operations are seamlessly integrated with GraphOps for convenient access.

396

397

```scala

398

// Direct access through GraphOps

399

val messages = graph.aggregateMessages[String](

400

triplet => triplet.sendToSrc(triplet.dstAttr),

401

(a, b) => s"$a,$b"

402

)

403

404

// Pregel access through GraphOps

405

val pregelResult = graph.pregel("initial")(

406

(id, attr, msg) => s"$attr-$msg",

407

triplet => Iterator((triplet.dstId, triplet.srcAttr)),

408

(a, b) => s"$a;$b"

409

)

410

```