or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-graph-operations.mdgraph-algorithms.mdgraph-analytics.mdindex.mdloading-utilities.mdmessage-passing.mdrdd-abstractions.md

rdd-abstractions.mddocs/

0

# RDD Abstractions

1

2

Specialized RDD implementations optimized for graph operations with fast joins, efficient storage, and performance optimizations for vertex and edge data.

3

4

## Capabilities

5

6

### VertexRDD

7

8

Specialized RDD for vertex data that extends `RDD[(VertexId, VD)]` with optimizations for graph operations.

9

10

```scala { .api }

11

/**

12

* RDD of vertices with pre-indexing for fast joins

13

* Ensures only one entry per vertex ID

14

*/

15

abstract class VertexRDD[VD](sc: SparkContext, deps: Seq[Dependency[_]])

16

extends RDD[(VertexId, VD)](sc, deps) {

17

18

/** Reindex the RDD for optimal join performance */

19

def reindex(): VertexRDD[VD]

20

21

/** Transform vertex values while preserving indexing */

22

def mapValues[VD2: ClassTag](f: VD => VD2): VertexRDD[VD2]

23

def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]

24

25

/** Remove vertices present in other RDD */

26

def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]

27

28

/** Find vertices with different values in other RDD */

29

def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]

30

}

31

```

32

33

**Usage Examples:**

34

35

```scala

36

import org.apache.spark.graphx._

37

38

// Create VertexRDD from regular RDD

39

val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(

40

(1L, "Alice"), (2L, "Bob"), (3L, "Charlie")

41

))

42

val vertexRDD = VertexRDD(vertices)

43

44

// Transform values efficiently

45

val upperCaseVertices = vertexRDD.mapValues(_.toUpperCase)

46

val prefixedVertices = vertexRDD.mapValues((id, name) => s"User_$id: $name")

47

48

// Set operations

49

val otherVertices: RDD[(VertexId, String)] = sc.parallelize(Array(

50

(2L, "Robert"), (4L, "David")

51

))

52

53

val uniqueVertices = vertexRDD.minus(otherVertices) // Vertices in first but not second

54

val changedVertices = vertexRDD.diff(otherVertices) // Vertices with different values

55

```

56

57

### VertexRDD Factory Methods

58

59

Static methods for creating VertexRDD instances from various sources.

60

61

```scala { .api }

62

object VertexRDD {

63

/**

64

* Create VertexRDD from regular RDD of vertex pairs

65

*/

66

def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD]

67

68

/**

69

* Create VertexRDD with default values for vertices implied by edges

70

* @param vertices Explicit vertex data

71

* @param edges Edge RDD to infer additional vertices from

72

* @param defaultVal Default value for vertices not in explicit data

73

*/

74

def apply[VD: ClassTag](

75

vertices: RDD[(VertexId, VD)],

76

edges: EdgeRDD[_],

77

defaultVal: VD

78

): VertexRDD[VD]

79

80

/**

81

* Create VertexRDD from edges only, using default value for all vertices

82

* @param edges Edge RDD to extract vertex IDs from

83

* @param numPartitions Number of partitions for the vertex RDD

84

* @param defaultVal Default value for all vertices

85

*/

86

def fromEdges[VD: ClassTag](

87

edges: EdgeRDD[_],

88

numPartitions: Int,

89

defaultVal: VD

90

): VertexRDD[VD]

91

}

92

```

93

94

### VertexRDD Join Operations

95

96

Optimized join operations that preserve VertexRDD indexing for performance.

97

98

```scala { .api }

99

/**

100

* Left zip join with another VertexRDD (faster than regular join)

101

* @param other Other VertexRDD to join with

102

* @param f Function to combine values (original, optional other)

103

*/

104

def leftZipJoin[VD2: ClassTag, VD3: ClassTag](other: VertexRDD[VD2])(

105

f: (VertexId, VD, Option[VD2]) => VD3

106

): VertexRDD[VD3]

107

108

/**

109

* Left join with regular RDD

110

* @param other Regular RDD to join with

111

* @param f Function to combine values

112

*/

113

def leftJoin[VD2: ClassTag, VD3: ClassTag](other: RDD[(VertexId, VD2)])(

114

f: (VertexId, VD, Option[VD2]) => VD3

115

): VertexRDD[VD3]

116

117

/**

118

* Inner zip join with another VertexRDD (only matching keys)

119

* @param other Other VertexRDD to join with

120

* @param f Function to combine values

121

*/

122

def innerZipJoin[U: ClassTag, VD2: ClassTag](other: VertexRDD[U])(

123

f: (VertexId, VD, U) => VD2

124

): VertexRDD[VD2]

125

126

/**

127

* Inner join with regular RDD

128

* @param other Regular RDD to join with

129

* @param f Function to combine values

130

*/

131

def innerJoin[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(

132

f: (VertexId, VD, U) => VD2

133

): VertexRDD[VD2]

134

```

135

136

**Usage Examples:**

137

138

```scala

139

// Create two VertexRDDs for joining

140

val names = VertexRDD(sc.parallelize(Array(

141

(1L, "Alice"), (2L, "Bob"), (3L, "Charlie")

142

)))

143

144

val ages = VertexRDD(sc.parallelize(Array(

145

(1L, 25), (2L, 30) // Note: Charlie missing

146

)))

147

148

// Left zip join - all vertices from left RDD kept

149

val profiles = names.leftZipJoin(ages) { (id, name, ageOpt) =>

150

ageOpt match {

151

case Some(age) => s"$name, age $age"

152

case None => s"$name, age unknown"

153

}

154

}

155

156

// Inner zip join - only matching vertices

157

val knownAges = names.innerZipJoin(ages) { (id, name, age) =>

158

s"$name is $age years old"

159

}

160

161

// Join with regular RDD

162

val locations: RDD[(VertexId, String)] = sc.parallelize(Array(

163

(1L, "NYC"), (3L, "LA")

164

))

165

166

val fullProfiles = names.leftJoin(locations) { (id, name, locationOpt) =>

167

val location = locationOpt.getOrElse("Unknown location")

168

s"$name lives in $location"

169

}

170

```

171

172

### VertexRDD Message Aggregation

173

174

Specialized aggregation method for message passing operations.

175

176

```scala { .api }

177

/**

178

* Aggregate messages using vertex index for efficient routing

179

* @param messages RDD of messages sent to vertices

180

* @param reduceFunc Function to merge multiple messages to same vertex

181

*/

182

def aggregateUsingIndex[VD2: ClassTag](

183

messages: RDD[(VertexId, VD2)],

184

reduceFunc: (VD2, VD2) => VD2

185

): VertexRDD[VD2]

186

```

187

188

**Usage Examples:**

189

190

```scala

191

// Simulate message passing - multiple messages to same vertex

192

val messages: RDD[(VertexId, Int)] = sc.parallelize(Array(

193

(1L, 5), (1L, 3), (2L, 7), (1L, 2) // Multiple messages to vertex 1

194

))

195

196

// Aggregate messages by summing

197

val aggregatedMessages = vertexRDD.aggregateUsingIndex(messages, (a: Int, b: Int) => a + b)

198

199

// Result: vertex 1 gets 10 (5+3+2), vertex 2 gets 7

200

aggregatedMessages.collect().foreach { case (id, sum) =>

201

println(s"Vertex $id received total: $sum")

202

}

203

```

204

205

### EdgeRDD

206

207

Specialized RDD for edge data with columnar storage optimizations.

208

209

```scala { .api }

210

/**

211

* RDD of edges with optimized storage format

212

* Extends RDD[Edge[ED]] with graph-specific optimizations

213

*/

214

abstract class EdgeRDD[ED](sc: SparkContext, deps: Seq[Dependency[_]])

215

extends RDD[Edge[ED]](sc, deps) {

216

217

/** Transform edge attributes while preserving structure */

218

def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2]

219

220

/** Reverse direction of all edges */

221

def reverse: EdgeRDD[ED]

222

223

/** Inner join with another EdgeRDD on (srcId, dstId) */

224

def innerJoin[ED2: ClassTag, ED3: ClassTag](other: EdgeRDD[ED2])(

225

f: (VertexId, VertexId, ED, ED2) => ED3

226

): EdgeRDD[ED3]

227

}

228

```

229

230

**Usage Examples:**

231

232

```scala

233

// Create EdgeRDD from regular RDD

234

val edges: RDD[Edge[String]] = sc.parallelize(Array(

235

Edge(1L, 2L, "follows"),

236

Edge(2L, 3L, "friend"),

237

Edge(3L, 1L, "colleague")

238

))

239

val edgeRDD = EdgeRDD.fromEdges(edges)

240

241

// Transform edge attributes

242

val weightedEdges = edgeRDD.mapValues(edge => edge.attr.length)

243

244

// Reverse all edges

245

val reversedEdges = edgeRDD.reverse

246

247

// Join with another EdgeRDD

248

val edgeWeights: RDD[Edge[Double]] = sc.parallelize(Array(

249

Edge(1L, 2L, 0.8), Edge(2L, 3L, 0.9)

250

))

251

val weightRDD = EdgeRDD.fromEdges(edgeWeights)

252

253

val joinedEdges = edgeRDD.innerJoin(weightRDD) { (src, dst, attr, weight) =>

254

s"$attr (weight: $weight)"

255

}

256

```

257

258

### EdgeRDD Factory Methods

259

260

Static methods for creating EdgeRDD instances.

261

262

```scala { .api }

263

object EdgeRDD {

264

/**

265

* Create EdgeRDD from regular RDD of edges

266

* @param edges RDD of Edge objects

267

*/

268

def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDDImpl[ED, VD]

269

}

270

```

271

272

## Types

273

274

### Edge

275

276

Case class representing a directed edge with source, destination, and attribute.

277

278

```scala { .api }

279

/**

280

* Directed edge with source vertex, destination vertex, and edge attribute

281

* @param srcId Source vertex ID

282

* @param dstId Destination vertex ID

283

* @param attr Edge attribute of type ED

284

*/

285

case class Edge[ED](var srcId: VertexId, var dstId: VertexId, var attr: ED) {

286

/**

287

* Get the ID of the vertex on the other end of this edge

288

* @param vid Vertex ID to compare against

289

* @return The other vertex ID (dst if vid==src, src if vid==dst)

290

*/

291

def otherVertexId(vid: VertexId): VertexId

292

293

/**

294

* Get the direction of this edge relative to a vertex

295

* @param vid Vertex ID to check direction from

296

* @return EdgeDirection (In if vid is destination, Out if vid is source)

297

*/

298

def relativeDirection(vid: VertexId): EdgeDirection

299

}

300

```

301

302

### EdgeTriplet

303

304

Extended edge representation that includes adjacent vertex attributes.

305

306

```scala { .api }

307

/**

308

* Edge augmented with adjacent vertex attributes

309

* Extends Edge[ED] with source and destination vertex data

310

*/

311

class EdgeTriplet[VD, ED] extends Edge[ED] {

312

/** Source vertex attribute */

313

var srcAttr: VD

314

315

/** Destination vertex attribute */

316

var dstAttr: VD

317

318

/**

319

* Get attribute of vertex on other end of edge

320

* @param vid Vertex ID to compare against

321

* @return Attribute of the other vertex

322

*/

323

def otherVertexAttr(vid: VertexId): VD

324

325

/**

326

* Get attribute of specified vertex

327

* @param vid Vertex ID to get attribute for

328

* @return Vertex attribute

329

*/

330

def vertexAttr(vid: VertexId): VD

331

332

/**

333

* Convert to tuple format for compatibility

334

* @return ((srcId, srcAttr), (dstId, dstAttr), edgeAttr)

335

*/

336

def toTuple: ((VertexId, VD), (VertexId, VD), ED)

337

}

338

```

339

340

**Usage Examples:**

341

342

```scala

343

// Working with EdgeTriplet in practice

344

val graph = Graph(vertices, edges)

345

346

// Access triplets - edges with vertex data

347

graph.triplets.collect().foreach { triplet =>

348

println(s"${triplet.srcAttr} -[${triplet.attr}]-> ${triplet.dstAttr}")

349

350

// Use utility methods

351

val otherVertex = triplet.otherVertexId(triplet.srcId) // Returns dstId

352

val otherAttr = triplet.otherVertexAttr(triplet.srcId) // Returns dstAttr

353

354

// Convert to tuple if needed

355

val ((srcId, srcAttr), (dstId, dstAttr), edgeAttr) = triplet.toTuple

356

}

357

```

358

359

## Performance Considerations

360

361

### VertexRDD Optimizations

362

363

- **Pre-indexing**: VertexRDD maintains index structure for O(1) lookups during joins

364

- **Reindexing**: Call `reindex()` after major transformations to rebuild optimal index

365

- **Zip joins**: Use `leftZipJoin` and `innerZipJoin` instead of regular joins for better performance

366

- **Partitioning**: VertexRDD respects graph partitioning for locality

367

368

### EdgeRDD Optimizations

369

370

- **Columnar storage**: EdgeRDD stores edges in columnar format for cache efficiency

371

- **Partition co-location**: Edges are partitioned to minimize network communication

372

- **Bulk operations**: Prefer bulk transformations over individual edge operations

373

374

**Usage Examples:**

375

376

```scala

377

// Optimize VertexRDD performance

378

val optimizedVertices = vertexRDD

379

.mapValues(_.toUpperCase) // Transform values

380

.reindex() // Rebuild index for subsequent joins

381

382

// Efficient join pattern

383

val result = optimizedVertices.leftZipJoin(otherVertexRDD) { (id, a, bOpt) =>

384

// Fast join using pre-built indices

385

s"$a ${bOpt.getOrElse("N/A")}"

386

}

387

```