or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-engine.mdgraph-processing.mdindex.mdmachine-learning.mdsql-dataframes.mdstream-processing.md

graph-processing.mddocs/

0

# Graph Processing

1

2

GraphX is Apache Spark's API for graphs and graph-parallel computation. It extends the Spark RDD abstraction with a resilient distributed property graph where vertices and edges have properties.

3

4

## Package Information

5

6

Graph processing functionality is available through:

7

8

```scala

9

import org.apache.spark.graphx._

10

import org.apache.spark.graphx.lib._

11

```

12

13

## Basic Usage

14

15

```scala

16

import org.apache.spark.graphx._

17

import org.apache.spark.rdd.RDD

18

19

// Create vertices RDD

20

val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(

21

(1L, "Alice"),

22

(2L, "Bob"),

23

(3L, "Charlie"),

24

(4L, "David")

25

))

26

27

// Create edges RDD

28

val edges: RDD[Edge[String]] = sc.parallelize(Array(

29

Edge(1L, 2L, "friend"),

30

Edge(2L, 3L, "friend"),

31

Edge(3L, 4L, "colleague"),

32

Edge(4L, 1L, "friend")

33

))

34

35

// Create graph

36

val graph = Graph(vertices, edges)

37

38

// Basic graph operations

39

println(s"Number of vertices: ${graph.vertices.count()}")

40

println(s"Number of edges: ${graph.edges.count()}")

41

42

// Graph algorithms

43

val ranks = graph.pageRank(0.0001).vertices

44

val connectedComponents = graph.connectedComponents().vertices

45

46

// Join with original vertex names

47

val ranksByUsername = vertices.join(ranks).map {

48

case (id, (username, rank)) => (username, rank)

49

}

50

51

ranksByUsername.collect().foreach(println)

52

```

53

54

## Capabilities

55

56

### Core Graph Types

57

58

#### Graph

59

60

The fundamental graph abstraction representing a property graph with typed vertex and edge properties.

61

62

```scala { .api }

63

abstract class Graph[VD: ClassTag, ED: ClassTag] extends Serializable {

64

// Graph structure

65

def vertices: VertexRDD[VD]

66

def edges: EdgeRDD[ED]

67

def triplets: RDD[EdgeTriplet[VD, ED]]

68

69

// Basic operations

70

def numEdges: Long

71

def numVertices: Long

72

def inDegrees: VertexRDD[Int]

73

def outDegrees: VertexRDD[Int]

74

def degrees: VertexRDD[Int]

75

76

// Transformations

77

def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]

78

def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]

79

def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

80

def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2, tripletFields: TripletFields): Graph[VD, ED2]

81

82

// Structural operations

83

def reverse: Graph[VD, ED]

84

def subgraph(epred: EdgeTriplet[VD, ED] => Boolean = (x => true),

85

vpred: (VertexId, VD) => Boolean = ((v, d) => true)): Graph[VD, ED]

86

def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]

87

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

88

89

// Join operations

90

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]

91

def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])

92

(updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]

93

94

// Graph algorithms

95

def connectedComponents(): Graph[VertexId, ED]

96

def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]

97

def triangleCount(): Graph[Int, ED]

98

def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

99

def personalizedPageRank(src: VertexId, tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

100

def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]

101

102

// Aggregation

103

def aggregateMessages[A: ClassTag](

104

sendMsg: EdgeContext[VD, ED, A] => Unit,

105

mergeMsg: (A, A) => A,

106

tripletFields: TripletFields = TripletFields.All): VertexRDD[A]

107

108

// Pregel API

109

def pregel[A: ClassTag](initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either)

110

(vprog: (VertexId, VD, A) => VD,

111

sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

112

mergeMsg: (A, A) => A): Graph[VD, ED]

113

114

// Persistence

115

def cache(): Graph[VD, ED]

116

def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

117

def unpersist(blocking: Boolean = true): Graph[VD, ED]

118

def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]

119

120

// Checkpointing

121

def checkpoint(): Unit

122

def isCheckpointed: Boolean

123

def getCheckpointFiles: Seq[String]

124

}

125

126

object Graph {

127

def apply[VD: ClassTag, ED: ClassTag](vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]],

128

defaultVertexAttr: VD = null.asInstanceOf[VD],

129

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

130

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

131

132

def fromEdges[VD: ClassTag, ED: ClassTag](edges: RDD[Edge[ED]], defaultValue: VD,

133

uniqueEdges: Option[PartitionStrategy] = None,

134

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

135

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

136

137

def fromEdgeTuples[VD: ClassTag](rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD,

138

uniqueEdges: Option[PartitionStrategy] = None,

139

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

140

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int]

141

}

142

```

143

144

#### VertexRDD

145

146

A specialized RDD for representing vertices in a graph.

147

148

```scala { .api }

149

abstract class VertexRDD[VD] extends RDD[(VertexId, VD)] {

150

// RDD operations optimized for vertices

151

def mapVertexPartitions[VD2: ClassTag](f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2]): VertexRDD[VD2]

152

153

// Join operations

154

def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])

155

(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]

156

def leftZipJoin[VD2: ClassTag, VD3: ClassTag](other: VertexRDD[VD2])

157

(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3]

158

def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])

159

(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]

160

def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])

161

(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]

162

163

// Aggregation

164

def aggregateUsingIndex[VD2: ClassTag](messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2]

165

166

// Set operations

167

def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]

168

def minus(other: VertexRDD[VD]): VertexRDD[VD]

169

170

// Conversion

171

def toRDD: RDD[(VertexId, VD)]

172

}

173

```

174

175

#### EdgeRDD

176

177

A specialized RDD for representing edges in a graph.

178

179

```scala { .api }

180

abstract class EdgeRDD[ED] extends RDD[Edge[ED]] {

181

// RDD operations optimized for edges

182

def mapEdgePartitions[ED2: ClassTag, VD: ClassTag](f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD]): EdgeRDD[ED2]

183

184

// Join operations

185

def innerJoin[ED2: ClassTag, ED3: ClassTag](other: RDD[Edge[ED2]])

186

(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3]

187

188

// Conversion

189

def toRDD: RDD[Edge[ED]]

190

}

191

```

192

193

#### Edge and EdgeTriplet

194

195

Basic edge data structures.

196

197

```scala { .api }

198

case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED) extends Serializable

199

200

class EdgeTriplet[VD, ED] extends Edge[ED] {

201

def srcAttr: VD

202

def dstAttr: VD

203

def otherVertexAttr(vid: VertexId): VD

204

def otherVertexId(vid: VertexId): VertexId

205

def relativizeDirection(vid: VertexId): EdgeDirection

206

def toTuple: ((VertexId, VD), (VertexId, VD), ED)

207

}

208

209

type VertexId = Long

210

211

object EdgeDirection extends Enumeration {

212

type EdgeDirection = Value

213

val In, Out, Either, Both = Value

214

}

215

```

216

217

### Graph Algorithms

218

219

GraphX provides implementations of common graph algorithms.

220

221

#### PageRank

222

223

```scala { .api }

224

object PageRank {

225

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]

226

227

def runUntilConvergence[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

228

229

def runWithOptions[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int, resetProb: Double = 0.15,

230

srcId: Option[VertexId] = None): Graph[Double, Double]

231

}

232

```

233

234

Usage example:

235

236

```scala

237

val graph: Graph[String, String] = // ... create graph

238

val ranks = graph.pageRank(0.0001).vertices

239

val topRanks = ranks.top(10)(Ordering.by(_._2))

240

```

241

242

#### Connected Components

243

244

```scala { .api }

245

object ConnectedComponents {

246

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]

247

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], maxIterations: Int): Graph[VertexId, ED]

248

}

249

```

250

251

#### Strongly Connected Components

252

253

```scala { .api }

254

object StronglyConnectedComponents {

255

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], numIter: Int): Graph[VertexId, ED]

256

}

257

```

258

259

#### Triangle Count

260

261

```scala { .api }

262

object TriangleCount {

263

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]

264

def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]

265

}

266

```

267

268

#### Label Propagation

269

270

```scala { .api }

271

object LabelPropagation {

272

def run[ED: ClassTag](graph: Graph[Int, ED], maxSteps: Int): Graph[VertexId, ED]

273

}

274

```

275

276

#### Shortest Paths

277

278

```scala { .api }

279

object ShortestPaths {

280

def run[ED: ClassTag](graph: Graph[_, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED]

281

282

type SPMap = Map[VertexId, Int]

283

}

284

```

285

286

### Graph Construction and Loading

287

288

#### GraphLoader

289

290

Utilities for loading graphs from various formats.

291

292

```scala { .api }

293

object GraphLoader {

294

def edgeListFile(sc: SparkContext, path: String, canonicalOrientation: Boolean = false,

295

numEdgePartitions: Int = -1, edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

296

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[Int, Int]

297

}

298

```

299

300

Usage example:

301

302

```scala

303

// Load graph from edge list file

304

val graph = GraphLoader.edgeListFile(sc, "path/to/edges.txt")

305

306

// Edge list file format: srcId dstId (one edge per line)

307

// Example content:

308

// 1 2

309

// 2 3

310

// 3 1

311

```

312

313

### Graph Partitioning

314

315

Control how graph data is distributed across the cluster.

316

317

```scala { .api }

318

abstract class PartitionStrategy extends Serializable {

319

def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID

320

}

321

322

object PartitionStrategy {

323

case object RandomVertexCut extends PartitionStrategy

324

case object EdgePartition1D extends PartitionStrategy

325

case object EdgePartition2D extends PartitionStrategy

326

case object CanonicalRandomVertexCut extends PartitionStrategy

327

328

def fromString(s: String): PartitionStrategy

329

}

330

```

331

332

### Message Passing

333

334

GraphX provides the aggregateMessages API for efficient message passing.

335

336

```scala { .api }

337

abstract class EdgeContext[VD, ED, A] {

338

def srcId: VertexId

339

def dstId: VertexId

340

def srcAttr: VD

341

def dstAttr: VD

342

def attr: ED

343

def sendToSrc(msg: A): Unit

344

def sendToDst(msg: A): Unit

345

def toEdgeTriplet: EdgeTriplet[VD, ED]

346

}

347

348

case class TripletFields(useSrc: Boolean = true, useDst: Boolean = true, useEdge: Boolean = true)

349

350

object TripletFields {

351

val None = TripletFields(false, false, false)

352

val EdgeOnly = TripletFields(false, false, true)

353

val Src = TripletFields(true, false, false)

354

val Dst = TripletFields(false, true, false)

355

val All = TripletFields(true, true, true)

356

}

357

```

358

359

Usage example:

360

361

```scala

362

val graph: Graph[Double, Double] = // ... create graph

363

364

// Compute sum of neighbor values

365

val neighborSum = graph.aggregateMessages[Double](

366

triplet => {

367

// Send source attribute to destination

368

triplet.sendToDst(triplet.srcAttr)

369

// Send destination attribute to source

370

triplet.sendToSrc(triplet.dstAttr)

371

},

372

// Merge function

373

(a, b) => a + b

374

)

375

376

// Update vertex attributes with neighbor sums

377

val newGraph = graph.joinVertices(neighborSum) { (vid, oldAttr, msgSum) =>

378

msgSum.getOrElse(0.0)

379

}

380

```

381

382

### Pregel API

383

384

The Pregel API is a vertex-centric approach to graph computation.

385

386

```scala { .api }

387

object Pregel {

388

def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](graph: Graph[VD, ED], initialMsg: A,

389

maxIterations: Int = Int.MaxValue,

390

activeDirection: EdgeDirection = EdgeDirection.Either)

391

(vprog: (VertexId, VD, A) => VD,

392

sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

393

mergeMsg: (A, A) => A): Graph[VD, ED]

394

}

395

```

396

397

Usage example:

398

399

```scala

400

// Single-source shortest path using Pregel

401

def shortestPaths[ED: ClassTag](graph: Graph[Double, ED], sourceId: VertexId): Graph[Double, ED] = {

402

val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)

403

404

val sssp = initialGraph.pregel(Double.PositiveInfinity)(

405

// Vertex program: update vertex value with minimum distance

406

(id, dist, newDist) => math.min(dist, newDist),

407

408

// Send message: send distance + edge weight to neighbors

409

triplet => {

410

if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {

411

Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))

412

} else {

413

Iterator.empty

414

}

415

},

416

417

// Merge messages: take minimum distance

418

(a, b) => math.min(a, b)

419

)

420

421

sssp

422

}

423

```

424

425

### GraphX Utilities

426

427

Additional utilities for graph processing.

428

429

```scala { .api }

430

object GraphGenerators {

431

def logNormalGraph(sc: SparkContext, numVertices: Int, numEParts: Int = 0, mu: Double = 4.0, sigma: Double = 1.3,

432

seed: Long = -1): Graph[Long, Int]

433

434

def rmatGraph(sc: SparkContext, requestedNumVertices: Int, numEdges: Int, a: Double = 0.45, b: Double = 0.15,

435

c: Double = 0.15, d: Double = 0.25, seed: Long = -1, numEParts: Int = 0): Graph[Int, Int]

436

437

def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int]

438

439

def gridGraph(sc: SparkContext, height: Int, width: Int): Graph[(Int, Int), Double]

440

}

441

```

442

443

Usage example:

444

445

```scala

446

// Generate a synthetic graph

447

val syntheticGraph = GraphGenerators.logNormalGraph(sc, numVertices = 1000)

448

449

// Run PageRank on synthetic graph

450

val ranks = syntheticGraph.pageRank(0.001).vertices

451

val topVertices = ranks.top(10)(Ordering.by(_._2))

452

```