or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-graph-operations.mdgraph-algorithms.mdgraph-analytics.mdindex.mdloading-utilities.mdmessage-passing.mdrdd-abstractions.md

core-graph-operations.mddocs/

0

# Core Graph Operations

1

2

Fundamental graph abstractions and operations for creating, transforming, and querying property graphs with type-safe vertex and edge attributes.

3

4

## Capabilities

5

6

### Graph Trait

7

8

Main graph abstraction representing an immutable property graph with arbitrary objects associated with vertices and edges.

9

10

```scala { .api }

11

/**

12

* Property graph with vertex data type VD and edge data type ED

13

* Immutable - operations return new graphs rather than modifying existing ones

14

*/

15

abstract class Graph[VD: ClassTag, ED: ClassTag] {

16

/** RDD containing vertices and their attributes */

17

val vertices: VertexRDD[VD]

18

19

/** RDD containing edges and their attributes */

20

val edges: EdgeRDD[ED]

21

22

/** RDD of edge triplets with adjacent vertex data */

23

val triplets: RDD[EdgeTriplet[VD, ED]]

24

25

/** Cache/persist the graph at specified storage level */

26

def persist(newLevel: StorageLevel): Graph[VD, ED]

27

def cache(): Graph[VD, ED]

28

def unpersist(blocking: Boolean): Graph[VD, ED]

29

def unpersistVertices(blocking: Boolean = true): Graph[VD, ED]

30

31

/** Checkpoint support for fault tolerance */

32

def checkpoint(): Unit

33

def isCheckpointed: Boolean

34

def getCheckpointFiles: Seq[String]

35

36

/** Repartition edges according to partition strategy */

37

def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]

38

def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]

39

}

40

```

41

42

**Usage Examples:**

43

44

```scala

45

import org.apache.spark.graphx._

46

import org.apache.spark.rdd.RDD

47

48

// Create vertices and edges

49

val vertices: RDD[(VertexId, String)] = sc.parallelize(Array(

50

(1L, "Alice"), (2L, "Bob"), (3L, "Charlie"), (4L, "David")

51

))

52

53

val edges: RDD[Edge[String]] = sc.parallelize(Array(

54

Edge(1L, 2L, "follows"),

55

Edge(2L, 3L, "friend"),

56

Edge(3L, 4L, "colleague")

57

))

58

59

// Create graph

60

val graph = Graph(vertices, edges)

61

62

// Cache for repeated operations

63

val cachedGraph = graph.cache()

64

65

// Access graph components

66

println(s"Vertices count: ${graph.vertices.count()}")

67

println(s"Edges count: ${graph.edges.count()}")

68

println(s"Triplets count: ${graph.triplets.count()}")

69

```

70

71

### Graph Factory Methods

72

73

Static methods for creating graphs from various data sources.

74

75

```scala { .api }

76

object Graph {

77

/**

78

* Create graph from vertex and edge RDDs

79

*/

80

def apply[VD: ClassTag, ED: ClassTag](

81

vertices: RDD[(VertexId, VD)],

82

edges: RDD[Edge[ED]]

83

): Graph[VD, ED]

84

85

/**

86

* Create graph from edges RDD with default vertex value

87

*/

88

def fromEdges[VD: ClassTag, ED: ClassTag](

89

edges: RDD[Edge[ED]],

90

defaultValue: VD

91

): Graph[VD, ED]

92

93

/**

94

* Create graph from simple edge tuples with default vertex and edge values

95

*/

96

def fromEdgeTuples[VD: ClassTag](

97

rawEdges: RDD[(VertexId, VertexId)],

98

defaultValue: VD

99

): Graph[VD, Int]

100

}

101

```

102

103

**Usage Examples:**

104

105

```scala

106

// Create from vertices and edges

107

val graph1 = Graph(vertices, edges)

108

109

// Create from edges only (vertices created automatically)

110

val edgeOnlyGraph = Graph.fromEdges(edges, "Unknown")

111

112

// Create from simple tuples

113

val tuples: RDD[(VertexId, VertexId)] = sc.parallelize(Array(

114

(1L, 2L), (2L, 3L), (3L, 1L)

115

))

116

val simpleGraph = Graph.fromEdgeTuples(tuples, "Person")

117

```

118

119

### Graph Transformation Operations

120

121

Operations that transform graphs by modifying vertex or edge attributes.

122

123

```scala { .api }

124

/**

125

* Transform vertex attributes using a mapping function

126

*/

127

def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2): Graph[VD2, ED]

128

129

/**

130

* Transform edge attributes using a mapping function

131

*/

132

def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2]

133

134

/**

135

* Transform edge attributes using triplet information

136

*/

137

def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]

138

139

/**

140

* Reverse direction of all edges

141

*/

142

def reverse: Graph[VD, ED]

143

```

144

145

**Usage Examples:**

146

147

```scala

148

// Transform vertex attributes - add prefix

149

val prefixedGraph = graph.mapVertices((id, name) => s"User_$name")

150

151

// Transform edge attributes - convert to weights

152

val weightedGraph = graph.mapEdges(edge => edge.attr.length)

153

154

// Transform edges using triplet data

155

val annotatedGraph = graph.mapTriplets(triplet =>

156

s"${triplet.srcAttr} -> ${triplet.dstAttr}: ${triplet.attr}"

157

)

158

159

// Reverse all edges

160

val reversedGraph = graph.reverse

161

```

162

163

### Graph Filtering and Subgraph Operations

164

165

Operations that create subgraphs by filtering vertices and edges.

166

167

```scala { .api }

168

/**

169

* Create subgraph by filtering edges and vertices

170

* @param epred Edge predicate function - edges to keep

171

* @param vpred Vertex predicate function - vertices to keep

172

*/

173

def subgraph(

174

epred: EdgeTriplet[VD, ED] => Boolean = _ => true,

175

vpred: (VertexId, VD) => Boolean = (_, _) => true

176

): Graph[VD, ED]

177

178

/**

179

* Group edges with same source/destination and merge attributes

180

*/

181

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

182

183

/**

184

* Restrict graph to vertices and edges that exist in another graph

185

* Keeps vertex and edge attributes from this graph

186

*/

187

def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]

188

```

189

190

**Usage Examples:**

191

192

```scala

193

// Filter to only "friend" relationships

194

val friendsGraph = graph.subgraph(triplet => triplet.attr == "friend")

195

196

// Filter to specific users and their connections

197

val aliceFriendsGraph = graph.subgraph(

198

epred = triplet => triplet.srcAttr == "Alice" || triplet.dstAttr == "Alice",

199

vpred = (id, name) => name == "Alice" || name == "Bob"

200

)

201

202

// Group parallel edges by combining attributes

203

val groupedGraph = graph.groupEdges((a, b) => s"$a,$b")

204

205

// Create masked graph using structure from another graph

206

val template: Graph[String, String] = // some other graph

207

val maskedGraph = graph.mask(template) // keeps original attributes

208

```

209

210

### Vertex Join Operations

211

212

Operations that join vertex data with external RDDs.

213

214

```scala { .api }

215

/**

216

* Join vertices with another RDD, keeping all original vertices

217

* Missing join keys get None values

218

*/

219

def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])(

220

mapFunc: (VertexId, VD, Option[U]) => VD2

221

): Graph[VD2, ED]

222

```

223

224

**Usage Examples:**

225

226

```scala

227

// Add age information to vertices

228

val ages: RDD[(VertexId, Int)] = sc.parallelize(Array(

229

(1L, 25), (2L, 30), (3L, 28)

230

))

231

232

val enrichedGraph = graph.outerJoinVertices(ages) { (id, name, ageOpt) =>

233

ageOpt match {

234

case Some(age) => s"$name ($age)"

235

case None => s"$name (age unknown)"

236

}

237

}

238

```

239

240

### Message Aggregation

241

242

Core message passing operation for implementing graph algorithms.

243

244

```scala { .api }

245

/**

246

* Aggregate messages sent along edges to compute new vertex values

247

* @param sendMsg Function to send messages along edges

248

* @param mergeMsg Function to merge multiple messages at same vertex

249

*/

250

def aggregateMessages[A: ClassTag](

251

sendMsg: EdgeContext[VD, ED, A] => Unit,

252

mergeMsg: (A, A) => A,

253

tripletFields: TripletFields = TripletFields.All

254

): VertexRDD[A]

255

```

256

257

**Usage Examples:**

258

259

```scala

260

// Count neighbors for each vertex

261

val neighborCounts = graph.aggregateMessages[Int](

262

triplet => {

263

triplet.sendToSrc(1)

264

triplet.sendToDst(1)

265

},

266

(a, b) => a + b

267

)

268

269

// Collect neighbor names

270

val neighborNames = graph.aggregateMessages[Array[String]](

271

triplet => {

272

triplet.sendToSrc(Array(triplet.dstAttr))

273

triplet.sendToDst(Array(triplet.srcAttr))

274

},

275

(a, b) => a ++ b

276

)

277

```

278

279

## Types

280

281

### EdgeContext

282

283

Context object used in message aggregation providing access to edge and vertex data.

284

285

```scala { .api }

286

abstract class EdgeContext[VD, ED, A] {

287

/** Source vertex ID */

288

def srcId: VertexId

289

290

/** Destination vertex ID */

291

def dstId: VertexId

292

293

/** Source vertex attribute */

294

def srcAttr: VD

295

296

/** Destination vertex attribute */

297

def dstAttr: VD

298

299

/** Edge attribute */

300

def attr: ED

301

302

/** Send message to source vertex */

303

def sendToSrc(msg: A): Unit

304

305

/** Send message to destination vertex */

306

def sendToDst(msg: A): Unit

307

308

/** Convert to EdgeTriplet for compatibility */

309

def toEdgeTriplet: EdgeTriplet[VD, ED]

310

}

311

```