or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-graph-api.mdgraph-algorithms.mdindex.mdpregel-api.mdutilities.md

core-graph-api.mddocs/

0

# Core Graph API

1

2

Fundamental graph construction, transformation, and analysis operations for building and manipulating distributed graph structures in GraphX.

3

4

## Capabilities

5

6

### Graph Construction

7

8

Create graphs from vertices and edges RDDs with full type safety and optimized partitioning.

9

10

```scala { .api }

11

/**

12

* Construct a graph from vertex and edge RDDs

13

* @param vertices RDD of (VertexId, VertexAttribute) pairs

14

* @param edges RDD of Edge objects with attributes

15

* @param defaultVertexAttr Default attribute for vertices not in vertices RDD

16

* @param edgeStorageLevel Storage level for edges

17

* @param vertexStorageLevel Storage level for vertices

18

* @returns New graph instance

19

*/

20

def Graph.apply[VD: ClassTag, ED: ClassTag](

21

vertices: RDD[(VertexId, VD)],

22

edges: RDD[Edge[ED]],

23

defaultVertexAttr: Option[VD] = None,

24

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

25

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

26

): Graph[VD, ED]

27

28

/**

29

* Construct a graph from just edges, creating vertices with default attributes

30

* @param edges RDD of edges

31

* @param defaultValue Default vertex attribute

32

* @param uniqueEdges Whether to combine duplicate edges

33

* @param edgeStorageLevel Storage level for edges

34

* @param vertexStorageLevel Storage level for vertices

35

* @returns New graph with inferred vertices

36

*/

37

def Graph.fromEdges[VD: ClassTag, ED: ClassTag](

38

edges: RDD[Edge[ED]],

39

defaultValue: VD,

40

uniqueEdges: Option[PartitionStrategy] = None,

41

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

42

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

43

): Graph[VD, ED]

44

45

/**

46

* Construct a graph from edge tuples with integer edge attributes

47

* @param rawEdges RDD of (srcId, dstId) tuples

48

* @param defaultValue Default vertex attribute

49

* @param uniqueEdges Whether to combine duplicate edges

50

* @param edgeStorageLevel Storage level for edges

51

* @param vertexStorageLevel Storage level for vertices

52

* @returns New graph with integer edge weights

53

*/

54

def Graph.fromEdgeTuples[VD: ClassTag](

55

rawEdges: RDD[(VertexId, VertexId)],

56

defaultValue: VD,

57

uniqueEdges: Option[PartitionStrategy] = None,

58

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

59

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

60

): Graph[VD, Int]

61

```

62

63

**Usage Examples:**

64

65

```scala

66

import org.apache.spark.graphx._

67

68

// Create from vertices and edges

69

val vertices = sc.parallelize(Array((1L, "Alice"), (2L, "Bob")))

70

val edges = sc.parallelize(Array(Edge(1L, 2L, "friend")))

71

val graph = Graph(vertices, edges)

72

73

// Create from edges only

74

val edges = sc.parallelize(Array(Edge(1L, 2L, 1.0), Edge(2L, 3L, 2.0)))

75

val graph = Graph.fromEdges(edges, defaultValue = "Unknown")

76

77

// Create from edge tuples

78

val edgeTuples = sc.parallelize(Array((1L, 2L), (2L, 3L), (3L, 1L)))

79

val graph = Graph.fromEdgeTuples(edgeTuples, defaultValue = 0)

80

```

81

82

### Graph Properties

83

84

Access basic graph metrics and structure information.

85

86

```scala { .api }

87

abstract class Graph[VD: ClassTag, ED: ClassTag] {

88

/** RDD containing vertices and their attributes */

89

val vertices: VertexRDD[VD]

90

91

/** RDD containing edges and their attributes */

92

val edges: EdgeRDD[ED]

93

94

/** RDD of edge triplets with adjacent vertex attributes */

95

val triplets: RDD[EdgeTriplet[VD, ED]]

96

}

97

98

class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {

99

/** Total number of vertices in the graph */

100

def numVertices: Long

101

102

/** Total number of edges in the graph */

103

def numEdges: Long

104

105

/** In-degree of each vertex */

106

def inDegrees: VertexRDD[Int]

107

108

/** Out-degree of each vertex */

109

def outDegrees: VertexRDD[Int]

110

111

/** Total degree (in + out) of each vertex */

112

def degrees: VertexRDD[Int]

113

}

114

```

115

116

### Graph Transformations

117

118

Transform vertex and edge attributes while preserving graph structure.

119

120

```scala { .api }

121

/**

122

* Transform vertex attributes using a mapping function

123

* @param map Function transforming (VertexId, VertexAttribute) to new attribute

124

* @returns New graph with transformed vertex attributes

125

*/

126

def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]

127

128

/**

129

* Transform edge attributes using edge objects

130

* @param map Function transforming Edge to new edge attribute

131

* @returns New graph with transformed edge attributes

132

*/

133

def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]

134

135

/**

136

* Transform edge attributes using triplets (includes adjacent vertex data)

137

* @param map Function transforming EdgeTriplet to new edge attribute

138

* @returns New graph with transformed edge attributes

139

*/

140

def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

141

142

/**

143

* Transform edge attributes using triplets with optimization hints

144

* @param map Function transforming EdgeTriplet to new edge attribute

145

* @param tripletFields Fields accessed by map function for optimization

146

* @returns New graph with transformed edge attributes

147

*/

148

def mapTriplets[ED2: ClassTag](

149

map: EdgeTriplet[VD, ED] => ED2,

150

tripletFields: TripletFields

151

): Graph[VD, ED2]

152

153

/**

154

* Reverse the direction of all edges

155

* @returns New graph with reversed edges

156

*/

157

def reverse: Graph[VD, ED]

158

```

159

160

**Usage Examples:**

161

162

```scala

163

// Transform vertex attributes

164

val upperGraph = graph.mapVertices((id, name) => name.toUpperCase)

165

166

// Transform edge attributes

167

val weightedGraph = graph.mapEdges(edge => edge.attr.length)

168

169

// Transform edges using adjacent vertex data

170

val labeledGraph = graph.mapTriplets(triplet =>

171

s"${triplet.srcAttr}->${triplet.dstAttr}")

172

173

// Reverse all edges

174

val reversedGraph = graph.reverse

175

```

176

177

### Graph Filtering and Subgraphs

178

179

Filter graphs by vertex and edge predicates to create subgraphs.

180

181

```scala { .api }

182

/**

183

* Filter graph by edge and vertex predicates

184

* @param epred Edge predicate function (EdgeTriplet => Boolean)

185

* @param vpred Vertex predicate function ((VertexId, VD) => Boolean)

186

* @returns Subgraph containing only vertices/edges satisfying predicates

187

*/

188

def subgraph(

189

epred: EdgeTriplet[VD, ED] => Boolean = (x => true),

190

vpred: (VertexId, VD) => Boolean = ((v, d) => true)

191

): Graph[VD, ED]

192

193

/**

194

* Restrict graph to vertices and edges also present in another graph

195

* @param other Graph defining the mask

196

* @returns Intersection of current graph with other graph

197

*/

198

def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]

199

200

/**

201

* Filter graph with preprocessing step for optimization

202

* @param preprocess Function to preprocess graph before filtering

203

* @param epred Edge predicate

204

* @param vpred Vertex predicate

205

* @returns Filtered graph

206

*/

207

def filter[VD2: ClassTag, ED2: ClassTag](

208

preprocess: Graph[VD, ED] => Graph[VD2, ED2],

209

epred: EdgeTriplet[VD2, ED2] => Boolean,

210

vpred: (VertexId, VD2) => Boolean

211

): Graph[VD, ED]

212

```

213

214

### Graph Joins and Aggregation

215

216

Join graphs with RDDs and perform message-passing aggregation operations.

217

218

```scala { .api }

219

/**

220

* Join vertices with an RDD, transforming vertex attributes

221

* @param table RDD of (VertexId, U) pairs to join

222

* @param mapFunc Function to combine vertex attribute and table value

223

* @returns New graph with joined vertex attributes

224

*/

225

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(

226

mapFunc: (VertexId, VD, U) => VD

227

): Graph[VD, ED]

228

229

/**

230

* Left outer join vertices with an RDD

231

* @param other RDD to join with

232

* @param mapFunc Function handling (VertexId, VD, Option[U])

233

* @returns New graph with joined attributes

234

*/

235

def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(

236

mapFunc: (VertexId, VD, Option[U]) => VD2

237

): Graph[VD2, ED]

238

239

/**

240

* Core message-passing aggregation API

241

* @param sendMsg Function defining messages sent along edges

242

* @param mergeMsg Function combining messages at vertices

243

* @param tripletFields Fields accessed for optimization

244

* @returns VertexRDD with aggregated messages

245

*/

246

def aggregateMessages[A: ClassTag](

247

sendMsg: EdgeContext[VD, ED, A] => Unit,

248

mergeMsg: (A, A) => A,

249

tripletFields: TripletFields = TripletFields.All

250

): VertexRDD[A]

251

```

252

253

**Usage Examples:**

254

255

```scala

256

// Join with user ages

257

val ages = sc.parallelize(Array((1L, 25), (2L, 30)))

258

val graphWithAges = graph.joinVertices(ages)((id, name, age) => (name, age))

259

260

// Subgraph filtering

261

val activeUsers = graph.subgraph(

262

vpred = (id, user) => user.active,

263

epred = triplet => triplet.attr == "friend"

264

)

265

266

// Message aggregation - compute in-degrees

267

val inDegrees = graph.aggregateMessages[Int](

268

sendMsg = ctx => ctx.sendToDst(1),

269

mergeMsg = (a, b) => a + b

270

)

271

```

272

273

### Graph Persistence and Checkpointing

274

275

Control caching, persistence, and fault tolerance for iterative graph algorithms.

276

277

```scala { .api }

278

/**

279

* Persist graph at specified storage level

280

* @param newLevel Storage level for vertices and edges

281

* @returns Graph with specified persistence level

282

*/

283

def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

284

285

/**

286

* Cache graph at default storage level (MEMORY_ONLY)

287

* @returns Cached graph

288

*/

289

def cache(): Graph[VD, ED]

290

291

/**

292

* Mark graph for checkpointing to enable fault tolerance

293

*/

294

def checkpoint(): Unit

295

296

/**

297

* Remove graph from cache/persistence

298

* @param blocking Whether to block until unpersist is complete

299

* @returns Unpersisted graph

300

*/

301

def unpersist(blocking: Boolean = true): Graph[VD, ED]

302

303

/**

304

* Repartition edges using specified partitioning strategy

305

* @param partitionStrategy Strategy for distributing edges

306

* @returns Repartitioned graph

307

*/

308

def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

309

```

310

311

## Core Data Types

312

313

```scala { .api }

314

/** 64-bit vertex identifier */

315

type VertexId = Long

316

317

/** Integer partition identifier (must be < 2^30) */

318

type PartitionID = Int

319

320

/**

321

* Directed edge with source, destination, and attribute

322

*/

323

case class Edge[ED](srcId: VertexId, dstId: VertexId, attr: ED) {

324

/** Get the other vertex ID in this edge */

325

def otherVertexId(vid: VertexId): VertexId

326

327

/** Get edge direction relative to a vertex */

328

def relativeDirection(vid: VertexId): EdgeDirection

329

}

330

331

/**

332

* Edge with adjacent vertex attributes for message passing

333

*/

334

class EdgeTriplet[VD, ED] extends Edge[ED] {

335

/** Source vertex attribute */

336

val srcAttr: VD

337

338

/** Destination vertex attribute */

339

val dstAttr: VD

340

341

/** Get other vertex attribute */

342

def otherVertexAttr(vid: VertexId): VD

343

344

/** Get vertex attribute for specified vertex */

345

def vertexAttr(vid: VertexId): VD

346

347

/** Convert to tuple representation */

348

def toTuple: ((VertexId, VD), (VertexId, VD), ED)

349

}

350

351

/**

352

* Context for sending messages in aggregateMessages

353

*/

354

abstract class EdgeContext[VD, ED, A] {

355

val srcId: VertexId

356

val dstId: VertexId

357

val srcAttr: VD

358

val dstAttr: VD

359

val attr: ED

360

361

/** Send message to source vertex */

362

def sendToSrc(msg: A): Unit

363

364

/** Send message to destination vertex */

365

def sendToDst(msg: A): Unit

366

367

/** Convert to EdgeTriplet */

368

def toEdgeTriplet: EdgeTriplet[VD, ED]

369

}

370

```

371

372

### Specialized RDDs

373

374

```scala { .api }

375

/**

376

* Specialized RDD for vertices with efficient joins and indexing

377

*/

378

abstract class VertexRDD[VD] extends RDD[(VertexId, VD)] {

379

/** Reindex to contain only visible vertices */

380

def reindex(): VertexRDD[VD]

381

382

/** Transform vertex attributes */

383

def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]

384

def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]

385

386

/** Filter vertices by predicate */

387

def filter(pred: (VertexId, VD) => Boolean): VertexRDD[VD]

388

389

/** Set difference with another RDD */

390

def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]

391

392

/** Diff operation returning vertices that differ */

393

def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]

394

395

/** Left join with another RDD */

396

def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(

397

f: (VertexId, VD, Option[VD2]) => VD3

398

): VertexRDD[VD3]

399

400

/** Inner join with another RDD */

401

def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(

402

f: (VertexId, VD, U) => VD2

403

): VertexRDD[VD2]

404

}

405

406

/**

407

* Specialized RDD for edges with columnar storage

408

*/

409

abstract class EdgeRDD[ED] extends RDD[Edge[ED]] {

410

/** Transform edge attributes preserving structure */

411

def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]

412

413

/** Reverse all edges */

414

def reverse: EdgeRDD[ED]

415

416

/** Inner join with another EdgeRDD */

417

def innerJoin[ED2: ClassTag, ED3: ClassTag](other: EdgeRDD[ED2])(

418

f: (VertexId, VertexId, ED, ED2) => ED3

419

): EdgeRDD[ED3]

420

}

421

```