or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core.mddeployment.mdgraphx.mdindex.mdml.mdsql.mdstreaming.md

graphx.mddocs/

0

# Graph Processing

1

2

GraphX provides APIs for graphs and graph-parallel computation with fundamental operators like subgraph, joinVertices, and aggregateMessages, plus optimized variants of Pregel and graph algorithms including PageRank, connected components, and triangle counting.

3

4

## Capabilities

5

6

### Graph[VD, ED]

7

8

Main graph abstraction representing a directed multigraph with user-defined vertex and edge attributes.

9

10

```scala { .api }

11

/**

12

* Main graph abstraction with vertex and edge attributes

13

* @tparam VD vertex attribute type

14

* @tparam ED edge attribute type

15

*/

16

abstract class Graph[VD: ClassTag, ED: ClassTag] {

17

/** Graph vertices with attributes */

18

val vertices: VertexRDD[VD]

19

/** Graph edges with attributes */

20

val edges: EdgeRDD[ED]

21

/** Edge triplets (src vertex, edge, dst vertex) */

22

val triplets: RDD[EdgeTriplet[VD, ED]]

23

24

/** Transform vertex attributes */

25

def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]

26

/** Transform edge attributes */

27

def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]

28

/** Transform edge attributes using triplet information */

29

def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

30

31

/** Reverse edge directions */

32

def reverse: Graph[VD, ED]

33

/** Extract subgraph based on edge and vertex predicates */

34

def subgraph(

35

epred: EdgeTriplet[VD, ED] => Boolean = (x => true),

36

vpred: (VertexId, VD) => Boolean = ((v, d) => true)

37

): Graph[VD, ED]

38

39

/** Join vertices with external data */

40

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD): Graph[VD, ED]

41

/** Update vertex attributes using aggregated messages from neighbors */

42

def aggregateMessages[A: ClassTag](

43

sendMsg: EdgeContext[VD, ED, A] => Unit,

44

mergeMsg: (A, A) => A,

45

tripletFields: TripletFields = TripletFields.All

46

): VertexRDD[A]

47

48

/** Number of vertices */

49

def numVertices: Long

50

/** Number of edges */

51

def numEdges: Long

52

/** In-degrees of vertices */

53

def inDegrees: VertexRDD[Int]

54

/** Out-degrees of vertices */

55

def outDegrees: VertexRDD[Int]

56

/** Total degrees of vertices */

57

def degrees: VertexRDD[Int]

58

59

/** Persist graph in memory/disk */

60

def persist(newLevel: StorageLevel): Graph[VD, ED]

61

/** Cache graph in memory */

62

def cache(): Graph[VD, ED]

63

/** Remove graph from cache */

64

def unpersist(blocking: Boolean = false): Graph[VD, ED]

65

}

66

```

67

68

**Usage Examples:**

69

70

```scala

71

import org.apache.spark.graphx._

72

73

// Create graph from vertex and edge RDDs

74

val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(

75

(1L, "Alice"), (2L, "Bob"), (3L, "Charlie"), (4L, "David")

76

))

77

78

val edges: RDD[Edge[String]] = sc.parallelize(Array(

79

Edge(1L, 2L, "friend"),

80

Edge(2L, 3L, "follow"),

81

Edge(3L, 4L, "friend")

82

))

83

84

val graph = Graph(vertices, edges)

85

86

// Transform vertices

87

val upperGraph = graph.mapVertices((id, name) => name.toUpperCase)

88

89

// Filter subgraph

90

val youngUsers = graph.subgraph(vpred = (id, name) => name.length > 3)

91

92

// Aggregate messages (compute degrees manually)

93

val degrees = graph.aggregateMessages[Int](

94

triplet => {

95

triplet.sendToSrc(1)

96

triplet.sendToDst(1)

97

},

98

(a, b) => a + b

99

)

100

```

101

102

### Graph Construction

103

104

Factory methods and utilities for creating graphs.

105

106

```scala { .api }

107

/**

108

* Graph object with factory methods

109

*/

110

object Graph {

111

/** Create graph from vertex and edge RDDs */

112

def apply[VD: ClassTag, ED: ClassTag](

113

vertices: RDD[(VertexId, VD)],

114

edges: RDD[Edge[ED]],

115

defaultVertexAttr: VD

116

): Graph[VD, ED]

117

118

/** Create graph from edges with default vertex attributes */

119

def fromEdges[VD: ClassTag, ED: ClassTag](

120

edges: RDD[Edge[ED]],

121

defaultValue: VD

122

): Graph[VD, ED]

123

124

/** Create graph from edge tuples */

125

def fromEdgeTuples[VD: ClassTag](

126

rawEdges: RDD[(VertexId, VertexId)],

127

defaultValue: VD,

128

uniqueEdges: Option[PartitionStrategy] = None

129

): Graph[VD, Int]

130

}

131

132

/**

133

* GraphLoader provides utilities for loading graphs from files

134

*/

135

object GraphLoader {

136

/** Load graph from edge list file */

137

def edgeListFile(

138

sc: SparkContext,

139

path: String,

140

canonicalOrientation: Boolean = false,

141

numEdgePartitions: Int = -1,

142

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

143

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

144

): Graph[Int, Int]

145

}

146

```

147

148

**Usage Examples:**

149

150

```scala

151

// Load from edge list file

152

val graph = GraphLoader.edgeListFile(sc, "hdfs://path/to/edges.txt")

153

154

// Create from edge tuples

155

val edgeTuples = sc.parallelize(Array((1L, 2L), (2L, 3L), (3L, 1L)))

156

val tupleGraph = Graph.fromEdgeTuples(edgeTuples, "defaultVertex")

157

158

// Create from edges only

159

val edgeList = sc.parallelize(Array(

160

Edge(1L, 2L, 1.0),

161

Edge(2L, 3L, 2.0)

162

))

163

val edgeGraph = Graph.fromEdges(edgeList, "missing")

164

```

165

166

### Core Types

167

168

Fundamental types used in graph processing.

169

170

```scala { .api }

171

/** Vertex identifier type */

172

type VertexId = Long

173

174

/**

175

* Edge with source, destination, and attribute

176

*/

177

case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED)

178

179

/**

180

* Edge with source and destination vertex attributes

181

*/

182

class EdgeTriplet[VD, ED] extends Edge[ED] {

183

/** Source vertex attribute */

184

var srcAttr: VD = _

185

/** Destination vertex attribute */

186

var dstAttr: VD = _

187

188

/** Set source vertex attribute */

189

def set(other: Edge[ED], srcAttr: VD, dstAttr: VD): EdgeTriplet[VD, ED]

190

}

191

192

/**

193

* Context for sending messages in aggregateMessages

194

*/

195

abstract class EdgeContext[VD, ED, A] {

196

/** Source vertex ID */

197

def srcId: VertexId

198

/** Destination vertex ID */

199

def dstId: VertexId

200

/** Source vertex attribute */

201

def srcAttr: VD

202

/** Destination vertex attribute */

203

def dstAttr: VD

204

/** Edge attribute */

205

def attr: ED

206

207

/** Send message to source vertex */

208

def sendToSrc(msg: A): Unit

209

/** Send message to destination vertex */

210

def sendToDst(msg: A): Unit

211

}

212

```

213

214

### VertexRDD[VD]

215

216

Specialized RDD for vertices with efficient joins and graph operations.

217

218

```scala { .api }

219

/**

220

* RDD of vertices with efficient joins

221

*/

222

abstract class VertexRDD[VD: ClassTag] extends RDD[(VertexId, VD)] {

223

/** Filter vertices by predicate */

224

def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD]

225

def filter(pred: (VertexId, VD) => Boolean): VertexRDD[VD]

226

227

/** Transform vertex values */

228

def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]

229

def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]

230

231

/** Vertices in this RDD but not in other */

232

def diff(other: VertexRDD[VD]): VertexRDD[VD]

233

234

/** Left join with another RDD */

235

def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(

236

f: (VertexId, VD, Option[VD2]) => VD3

237

): VertexRDD[VD3]

238

239

/** Inner join with another RDD */

240

def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(

241

f: (VertexId, VD, U) => VD2

242

): VertexRDD[VD2]

243

244

/** Aggregate values by vertex ID */

245

def aggregateUsingIndex[VD2: ClassTag](

246

messages: RDD[(VertexId, VD2)],

247

reduceFunc: (VD2, VD2) => VD2

248

): VertexRDD[VD2]

249

}

250

```

251

252

### EdgeRDD[ED]

253

254

Specialized RDD for edges with graph-specific optimizations.

255

256

```scala { .api }

257

/**

258

* RDD of edges with graph-specific optimizations

259

*/

260

abstract class EdgeRDD[ED: ClassTag] extends RDD[Edge[ED]] {

261

/** Transform edge values */

262

def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]

263

264

/** Reverse edge directions */

265

def reverse: EdgeRDD[ED]

266

267

/** Filter edges using triplet information */

268

def filter(pred: EdgeTriplet[_, ED] => Boolean): EdgeRDD[ED]

269

270

/** Join with vertex attributes to create triplets */

271

def innerJoin[VD: ClassTag, ED2: ClassTag](other: VertexRDD[VD])(

272

f: (VertexId, VertexId, ED, VD, VD) => ED2

273

): EdgeRDD[ED2]

274

}

275

```

276

277

### Graph Algorithms

278

279

Pre-implemented graph algorithms available through GraphOps.

280

281

```scala { .api }

282

/**

283

* Additional operations available on Graph through implicit conversion

284

*/

285

implicit class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {

286

/** Run PageRank algorithm */

287

def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

288

289

/** Run static PageRank for fixed number of iterations */

290

def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]

291

292

/** Find connected components */

293

def connectedComponents(): Graph[VertexId, ED]

294

295

/** Count triangles passing through each vertex */

296

def triangleCount(): Graph[Int, ED]

297

298

/** Find strongly connected components */

299

def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]

300

301

/** Collect neighbor IDs for each vertex */

302

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]

303

304

/** Collect neighbor attributes for each vertex */

305

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]

306

}

307

308

/**

309

* Edge directions for neighbor collection

310

*/

311

object EdgeDirection extends Enumeration {

312

val In, Out, Either, Both = Value

313

}

314

```

315

316

**Usage Examples:**

317

318

```scala

319

import org.apache.spark.graphx.lib._

320

321

// PageRank

322

val pageRankGraph = graph.pageRank(0.0001, 0.15)

323

val pageRanks = pageRankGraph.vertices.collect()

324

325

// Connected Components

326

val ccGraph = graph.connectedComponents()

327

val components = ccGraph.vertices.collect()

328

329

// Triangle Count

330

val triangleCountGraph = graph.triangleCount()

331

val triangleCounts = triangleCountGraph.vertices.collect()

332

333

// Collect neighbors

334

val neighbors = graph.collectNeighborIds(EdgeDirection.Out)

335

```

336

337

### Pregel API

338

339

Pregel-style bulk-synchronous message-passing abstraction.

340

341

```scala { .api }

342

/**

343

* Pregel-style computation

344

*/

345

object Pregel {

346

/** Run Pregel computation */

347

def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](

348

graph: Graph[VD, ED],

349

initialMsg: A,

350

maxIterations: Int = Int.MaxValue,

351

activeDirection: EdgeDirection = EdgeDirection.Either

352

)(

353

vprog: (VertexId, VD, A) => VD,

354

sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

355

mergeMsg: (A, A) => A

356

): Graph[VD, ED]

357

}

358

```

359

360

**Usage Examples:**

361

362

```scala

363

// Single Source Shortest Path using Pregel

364

def shortestPaths(graph: Graph[Double, Double], sourceId: VertexId): Graph[Double, Double] = {

365

val initialGraph = graph.mapVertices((id, _) =>

366

if (id == sourceId) 0.0 else Double.PositiveInfinity

367

)

368

369

Pregel(initialGraph, Double.PositiveInfinity)(

370

// Vertex program: update vertex value with minimum distance

371

(id, dist, newDist) => math.min(dist, newDist),

372

// Send message: send distance + edge weight to neighbors

373

triplet => {

374

if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {

375

Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))

376

} else {

377

Iterator.empty

378

}

379

},

380

// Merge messages: take minimum

381

(a, b) => math.min(a, b)

382

)

383

}

384

```

385

386

### Graph Partitioning

387

388

Strategies for partitioning graphs across cluster nodes.

389

390

```scala { .api }

391

/**

392

* Partitioning strategies for graph distribution

393

*/

394

object PartitionStrategy extends Enumeration {

395

/** Randomly assign edges to partitions */

396

val RandomVertexCut = Value

397

/** Assign edges based on source vertex hash */

398

val EdgePartition1D = Value

399

/** Two-dimensional partitioning */

400

val EdgePartition2D = Value

401

/** Canonical random vertex cut */

402

val CanonicalRandomVertexCut = Value

403

}

404

405

/**

406

* Partition graphs efficiently

407

*/

408

implicit class GraphPartitioning[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {

409

/** Repartition graph using specified strategy */

410

def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

411

def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]

412

}

413

```

414

415

### Graph I/O

416

417

Loading and saving graphs from various formats.

418

419

```scala { .api }

420

/**

421

* Save graph to various formats

422

*/

423

implicit class GraphWriter[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {

424

/** Save vertices to text file */

425

def saveVerticesAsTextFile(path: String): Unit

426

/** Save edges to text file */

427

def saveEdgesAsTextFile(path: String): Unit

428

}

429

```

430

431

**Usage Examples:**

432

433

```scala

434

// Partition graph

435

val partitionedGraph = graph

436

.partitionBy(PartitionStrategy.EdgePartition2D, numPartitions = 4)

437

438

// Save graph

439

graph.vertices.saveAsTextFile("hdfs://path/to/vertices")

440

graph.edges.saveAsTextFile("hdfs://path/to/edges")

441

442

// Complex graph analysis pipeline

443

val socialGraph = Graph.fromEdgeTuples(friendships, "User")

444

445

val pageRanks = socialGraph

446

.pageRank(0.0001)

447

.vertices

448

.sortBy(_._2, ascending = false)

449

450

val communities = socialGraph

451

.connectedComponents()

452

.vertices

453

.map { case (userId, componentId) => (componentId, userId) }

454

.groupByKey()

455

.collect()

456

```

457

458

## Error Handling

459

460

Common GraphX exceptions:

461

462

- `IllegalArgumentException` - Invalid graph construction parameters

463

- `SparkException` - General Spark execution errors during graph operations

464

- `ClassCastException` - Type mismatches in vertex/edge attributes