or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-launcher.mdcore-engine.mdgraph-processing.mdindex.mdmachine-learning.mdsql-dataframes.mdstream-processing.md

graph-processing.mddocs/

0

# Graph Processing

1

2

Spark GraphX is a graph computation framework built on top of Spark Core. It provides APIs for expressing graph computation that can model the user-defined graphs by using the property graph abstraction.

3

4

## Graph

5

6

The fundamental abstraction in GraphX is the property graph: a directed multigraph with properties attached to each vertex and edge.

7

8

```scala { .api }

9

abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializable {

10

// Basic graph properties

11

def vertices: VertexRDD[VD]

12

def edges: EdgeRDD[ED]

13

def triplets: RDD[EdgeTriplet[VD, ED]]

14

15

// Persistence operations

16

def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

17

def cache(): Graph[VD, ED]

18

def unpersist(blocking: Boolean = false): Graph[VD, ED]

19

def checkpoint(): Unit

20

def isCheckpointed: Boolean

21

def getCheckpointFiles: Seq[String]

22

23

// Structural operations

24

def numEdges: Long

25

def numVertices: Long

26

def inDegrees: VertexRDD[Int]

27

def outDegrees: VertexRDD[Int]

28

def degrees: VertexRDD[Int]

29

30

// Graph transformations

31

def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]

32

def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]

33

def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

34

def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2,

35

tripletFields: TripletFields): Graph[VD, ED2]

36

37

// Structural transformations

38

def reverse: Graph[VD, ED]

39

def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = (x => true),

40

vpred: (VertexId, VD) => Boolean = ((v, d) => true)): Graph[VD, ED]

41

def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]

42

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

43

44

// Neighborhood operations

45

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]

46

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]

47

def aggregateMessages[A: ClassTag](sendMsg: EdgeContext[VD, ED, A] => Unit,

48

mergeMsg: (A, A) => A,

49

tripletFields: TripletFields = TripletFields.All): VertexRDD[A]

50

def aggregateMessagesWithActiveSet[A: ClassTag](

51

sendMsg: EdgeContext[VD, ED, A] => Unit,

52

mergeMsg: (A, A) => A,

53

tripletFields: TripletFields,

54

activeSetOpt: Option[(VertexRDD[_], EdgeDirection)]): VertexRDD[A]

55

56

// Join operations

57

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]

58

def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])

59

(mapFunc: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]

60

61

// Partitioning

62

def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

63

def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]

64

65

// Pregel API

66

def pregel[A: ClassTag](initialMsg: A,

67

maxIterations: Int = Int.MaxValue,

68

activeDirection: EdgeDirection = EdgeDirection.Either)

69

(vprog: (VertexId, VD, A) => VD,

70

sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

71

mergeMsg: (A, A) => A): Graph[VD, ED]

72

73

// Graph algorithms

74

def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

75

def personalizedPageRank(src: VertexId, tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

76

def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]

77

def connectedComponents(): Graph[VertexId, ED]

78

def connectedComponents(maxIterations: Int): Graph[VertexId, ED]

79

def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]

80

def triangleCount(): Graph[Int, ED]

81

}

82

```

83

84

### Graph Companion Object

85

86

```scala { .api }

87

object Graph {

88

def apply[VD: ClassTag, ED: ClassTag](

89

vertices: RDD[(VertexId, VD)],

90

edges: RDD[Edge[ED]],

91

defaultVertexAttr: VD = null.asInstanceOf[VD],

92

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

93

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

94

95

def fromEdges[VD: ClassTag, ED: ClassTag](

96

edges: RDD[Edge[ED]],

97

defaultValue: VD,

98

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

99

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

100

101

def fromEdgeTuples[VD: ClassTag](

102

rawEdges: RDD[(VertexId, VertexId)],

103

defaultValue: VD,

104

uniqueEdges: Option[PartitionStrategy] = None,

105

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

106

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]

107

108

implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]): GraphOps[VD, ED]

109

}

110

```

111

112

### Usage Examples

113

114

```scala

115

import org.apache.spark.graphx._

116

import org.apache.spark.rdd.RDD

117

118

// Create vertices RDD

119

val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array(

120

(3L, ("rxin", "student")),

121

(7L, ("jgonzal", "postdoc")),

122

(5L, ("franklin", "prof")),

123

(2L, ("istoica", "prof"))

124

))

125

126

// Create edges RDD

127

val relationships: RDD[Edge[String]] = sc.parallelize(Array(

128

Edge(3L, 7L, "collab"),

129

Edge(5L, 3L, "advisor"),

130

Edge(2L, 5L, "colleague"),

131

Edge(5L, 7L, "pi")

132

))

133

134

// Build the graph

135

val graph = Graph(users, relationships)

136

137

// Basic operations

138

println(s"Number of vertices: ${graph.numVertices}")

139

println(s"Number of edges: ${graph.numEdges}")

140

141

// Get degrees

142

val degrees = graph.degrees

143

val inDegrees = graph.inDegrees

144

val outDegrees = graph.outDegrees

145

146

// Transform vertices and edges

147

val newGraph = graph.mapVertices { case (id, (name, pos)) => (name, pos, id) }

148

val edgeGraph = graph.mapEdges(e => e.attr.toUpperCase)

149

150

// Subgraph operations

151

val professorsGraph = graph.subgraph(vpred = (id, attr) => attr._2 == "prof")

152

153

// Graph algorithms

154

val ranks = graph.pageRank(0.0001).vertices

155

val connectedComponents = graph.connectedComponents().vertices

156

```

157

158

## VertexRDD and EdgeRDD

159

160

### VertexRDD

161

162

```scala { .api }

163

abstract class VertexRDD[VD: ClassTag](

164

sc: SparkContext,

165

deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps) {

166

167

// Efficient join operations

168

def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])

169

(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]

170

def leftZipJoin[VD2: ClassTag, VD3: ClassTag](other: VertexRDD[VD2])

171

(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]

172

def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])

173

(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]

174

def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])

175

(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]

176

177

// Aggregation operations

178

def aggregateUsingIndex[VD2: ClassTag](messages: RDD[(VertexId, VD2)],

179

reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]

180

181

// Transformation operations

182

def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]

183

def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]

184

def diff(other: VertexRDD[VD]): VertexRDD[VD]

185

186

// Caching and persistence

187

def cache(): VertexRDD[VD]

188

def persist(newLevel: StorageLevel): VertexRDD[VD]

189

def unpersist(blocking: Boolean = false): VertexRDD[VD]

190

191

// Structural operations

192

def reindex(): VertexRDD[VD]

193

def withTargetStorageLevel(targetStorageLevel: StorageLevel): VertexRDD[VD]

194

}

195

196

object VertexRDD {

197

def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD]

198

def fromEdges[VD: ClassTag](edges: EdgeRDD[_], numPartitions: Int, defaultVal: VD): VertexRDD[VD]

199

}

200

```

201

202

### EdgeRDD

203

204

```scala { .api }

205

abstract class EdgeRDD[ED: ClassTag](

206

sc: SparkContext,

207

deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps) {

208

209

// Core operations

210

def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]

211

def reverse: EdgeRDD[ED]

212

def filter(epred: EdgeTriplet[_, ED] => Boolean, vpred: (VertexId, _) => Boolean): EdgeRDD[ED]

213

def innerJoin[ED2: ClassTag, ED3: ClassTag](other: RDD[(VertexId, ED2)])

214

(f: (VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

215

216

// Caching and persistence

217

def cache(): EdgeRDD[ED]

218

def persist(newLevel: StorageLevel): EdgeRDD[ED]

219

def unpersist(blocking: Boolean = false): EdgeRDD[ED]

220

221

// Structural operations

222

def count(): Long

223

def mapPartitionsWithIndex[ED2: ClassTag](f: (Int, Iterator[Edge[ED]]) => Iterator[ED2],

224

preservesPartitioning: Boolean = false): RDD[ED2]

225

def withTargetStorageLevel(targetStorageLevel: StorageLevel): EdgeRDD[ED]

226

}

227

228

object EdgeRDD {

229

def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED]

230

}

231

```

232

233

## Core Types

234

235

### Basic Types

236

237

```scala { .api }

238

// Vertex identifier type

239

type VertexId = Long

240

241

// Edge representation

242

case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED) extends Serializable {

243

def otherVertexId(vid: VertexId): VertexId = if (srcId == vid) dstId else srcId

244

def relativeDirection(vid: VertexId): EdgeDirection = {

245

if (vid == srcId) EdgeDirection.Out else EdgeDirection.In

246

}

247

}

248

249

// Edge triplet with vertex attributes

250

class EdgeTriplet[VD, ED] extends Edge[ED] {

251

var srcAttr: VD = _

252

var dstAttr: VD = _

253

254

def otherVertexAttr(vid: VertexId): VD = if (vid == srcId) dstAttr else srcAttr

255

def vertexAttr(vid: VertexId): VD = if (vid == srcId) srcAttr else dstAttr

256

257

def toTuple: ((VertexId, VD), (VertexId, VD), ED) = ((srcId, srcAttr), (dstId, dstAttr), attr)

258

override def toString: String = s"(($srcId,$srcAttr),($dstId,$dstAttr),$attr)"

259

}

260

261

// Edge direction enumeration

262

object EdgeDirection extends Enumeration {

263

type EdgeDirection = Value

264

val In, Out, Either, Both = Value

265

}

266

267

// Edge context for message passing

268

abstract class EdgeContext[VD, ED, A] {

269

def srcId: VertexId

270

def dstId: VertexId

271

def srcAttr: VD

272

def dstAttr: VD

273

def attr: ED

274

def sendToSrc(msg: A): Unit

275

def sendToDst(msg: A): Unit

276

def toEdgeTriplet: EdgeTriplet[VD, ED]

277

}

278

279

// Triplet field specification for performance optimization

280

class TripletFields(val useSrc: Boolean, val useDst: Boolean, val useEdge: Boolean) extends Serializable

281

282

object TripletFields {

283

val None = new TripletFields(false, false, false)

284

val Src = new TripletFields(true, false, false)

285

val Dst = new TripletFields(false, true, false)

286

val Edge = new TripletFields(false, false, true)

287

val SrcOnly = new TripletFields(true, false, false)

288

val DstOnly = new TripletFields(false, true, false)

289

val EdgeOnly = new TripletFields(false, false, true)

290

val SrcDst = new TripletFields(true, true, false)

291

val SrcEdge = new TripletFields(true, false, true)

292

val DstEdge = new TripletFields(false, true, true)

293

val All = new TripletFields(true, true, true)

294

}

295

```

296

297

### Partitioning Strategies

298

299

```scala { .api }

300

abstract class PartitionStrategy extends Serializable {

301

def getPartition(src: VertexId, dst: VertexId, numParts: Int): PartitionID

302

}

303

304

object PartitionStrategy {

305

case object RandomVertexCut extends PartitionStrategy {

306

override def getPartition(src: VertexId, dst: VertexId, numParts: Int): PartitionID

307

}

308

309

case object EdgePartition1D extends PartitionStrategy {

310

override def getPartition(src: VertexId, dst: VertexId, numParts: Int): PartitionID

311

}

312

313

case object EdgePartition2D extends PartitionStrategy {

314

override def getPartition(src: VertexId, dst: VertexId, numParts: Int): PartitionID

315

}

316

317

case object CanonicalRandomVertexCut extends PartitionStrategy {

318

override def getPartition(src: VertexId, dst: VertexId, numParts: Int): PartitionID

319

}

320

321

def fromString(s: String): PartitionStrategy

322

}

323

```

324

325

### Usage Examples

326

327

```scala

328

import org.apache.spark.graphx._

329

330

// Working with VertexRDD

331

val newUsers: VertexRDD[String] = users.mapValues { case (name, pos) => name }

332

val joinedVertices = users.leftJoin(degrees) {

333

case (id, (name, pos), Some(deg)) => (name, pos, deg)

334

case (id, (name, pos), None) => (name, pos, 0)

335

}

336

337

// Working with EdgeRDD

338

val upperCaseEdges = relationships.mapValues(_.toUpperCase)

339

val reversedEdges = relationships.reverse

340

341

// Custom partitioning

342

val partitionedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D)

343

344

// Different triplet fields for optimization

345

val messages = graph.aggregateMessages[Int](

346

triplet => {

347

triplet.sendToSrc(1)

348

triplet.sendToDst(1)

349

},

350

(a: Int, b: Int) => a + b,

351

TripletFields.None // Only need edge structure, not attributes

352

)

353

```

354

355

## Graph Algorithms

356

357

### PageRank

358

359

```scala { .api }

360

object PageRank {

361

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int,

362

resetProb: Double = 0.15): Graph[Double, Double]

363

364

def runUntilConvergence[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double,

365

resetProb: Double = 0.15): Graph[Double, Double]

366

367

def runWithOptions[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int,

368

resetProb: Double = 0.15,

369

srcId: Option[VertexId] = None): Graph[Double, Double]

370

371

def runUntilConvergenceWithOptions[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double,

372

resetProb: Double = 0.15,

373

srcId: Option[VertexId] = None): Graph[Double, Double]

374

}

375

```

376

377

### Connected Components

378

379

```scala { .api }

380

object ConnectedComponents {

381

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]

382

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], maxIterations: Int): Graph[VertexId, ED]

383

}

384

```

385

386

### Strongly Connected Components

387

388

```scala { .api }

389

object StronglyConnectedComponents {

390

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED]

391

}

392

```

393

394

### Triangle Count

395

396

```scala { .api }

397

object TriangleCount {

398

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]

399

def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]

400

}

401

```

402

403

### Usage Examples

404

405

```scala

406

import org.apache.spark.graphx.lib._

407

408

// PageRank

409

val ranks = PageRank.run(graph, numIter = 10)

410

val convergedRanks = PageRank.runUntilConvergence(graph, tol = 0.0001)

411

412

// Connected Components

413

val cc = ConnectedComponents.run(graph)

414

val ccLimited = ConnectedComponents.run(graph, maxIterations = 10)

415

416

// Triangle Count

417

val triCounts = TriangleCount.run(graph)

418

419

// Join results with original vertex attributes

420

val ranksByUsername = users.join(ranks.vertices).map {

421

case (id, ((username, title), rank)) => (username, rank)

422

}

423

```

424

425

## GraphLoader

426

427

Utility for loading graphs from files.

428

429

```scala { .api }

430

object GraphLoader {

431

def edgeListFile(sc: SparkContext, path: String,

432

canonicalOrientation: Boolean = false,

433

numEdgePartitions: Int = -1,

434

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

435

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[Int, Int]

436

}

437

```

438

439

### Usage Examples

440

441

```scala

442

import org.apache.spark.graphx.GraphLoader

443

444

// Load graph from edge list file

445

// File format: srcId dstId

446

// Example: 1 2

447

// 2 3

448

// 3 1

449

val graph = GraphLoader.edgeListFile(sc, "path/to/edges.txt")

450

451

// With custom storage levels

452

val graphWithCustomStorage = GraphLoader.edgeListFile(

453

sc,

454

"path/to/edges.txt",

455

canonicalOrientation = true,

456

numEdgePartitions = 4,

457

edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,

458

vertexStorageLevel = StorageLevel.MEMORY_ONLY

459

)

460

```

461

462

## Pregel API

463

464

The Pregel API provides a vertex-centric approach to graph computation.

465

466

### Usage Examples

467

468

```scala

469

// Example: Single Source Shortest Path using Pregel

470

import scala.math.min

471

472

val sourceId: VertexId = 42L

473

val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)

474

475

val sssp = initialGraph.pregel(Double.PositiveInfinity)(

476

// Vertex program: receives the current vertex value and incoming message

477

(id, dist, newDist) => min(dist, newDist),

478

479

// Send message: send distance + edge weight to destination vertex

480

triplet => {

481

if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {

482

Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))

483

} else {

484

Iterator.empty

485

}

486

},

487

488

// Merge messages: take minimum distance

489

(a, b) => min(a, b)

490

)

491

492

// Custom Pregel example: compute vertex degrees

493

val degreeGraph = graph.pregel(0)(

494

// Vertex program: sum incoming messages

495

(id, oldSum, msgSum) => oldSum + msgSum,

496

497

// Send message: send 1 to each neighbor

498

triplet => {

499

Iterator((triplet.srcId, 1), (triplet.dstId, 1))

500

},

501

502

// Merge messages: sum them

503

(a, b) => a + b

504

)

505

```