or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-graph-operations.mdgraph-algorithms.mdgraph-analytics.mdindex.mdloading-utilities.mdmessage-passing.mdrdd-abstractions.md

graph-analytics.mddocs/

0

# Graph Analytics and Metrics

1

2

Graph analytics operations for computing structural properties, degrees, neighborhoods, and graph metrics through the GraphOps implicit class.

3

4

## Capabilities

5

6

### Graph Metrics

7

8

Basic structural metrics and properties of the graph.

9

10

```scala { .api }

11

class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {

12

/** Total number of edges in the graph */

13

lazy val numEdges: Long

14

15

/** Total number of vertices in the graph */

16

lazy val numVertices: Long

17

18

/** In-degree of each vertex (edges pointing to vertex) */

19

lazy val inDegrees: VertexRDD[Int]

20

21

/** Out-degree of each vertex (edges originating from vertex) */

22

lazy val outDegrees: VertexRDD[Int]

23

24

/** Total degree of each vertex (in-degree + out-degree) */

25

lazy val degrees: VertexRDD[Int]

26

}

27

```

28

29

**Usage Examples:**

30

31

```scala

32

import org.apache.spark.graphx._

33

34

// Basic graph metrics

35

println(s"Graph has ${graph.numVertices} vertices")

36

println(s"Graph has ${graph.numEdges} edges")

37

38

// Vertex degrees

39

val inDegreeStats = graph.inDegrees.map(_._2).stats()

40

val outDegreeStats = graph.outDegrees.map(_._2).stats()

41

42

println(s"Average in-degree: ${inDegreeStats.mean}")

43

println(s"Max out-degree: ${outDegreeStats.max}")

44

45

// Find vertices with highest degree

46

val highestDegreeVertex = graph.degrees.reduce((a, b) => if (a._2 > b._2) a else b)

47

println(s"Highest degree vertex: ${highestDegreeVertex._1}, degree: ${highestDegreeVertex._2}")

48

```

49

50

### Neighborhood Collection

51

52

Operations to collect neighboring vertices and edges for each vertex.

53

54

```scala { .api }

55

/**

56

* Collect vertex IDs of neighboring vertices

57

* @param edgeDirection Direction of edges to follow (In/Out/Either/Both)

58

*/

59

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]

60

61

/**

62

* Collect neighboring vertices with their attributes

63

* @param edgeDirection Direction of edges to follow

64

*/

65

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]

66

67

/**

68

* Collect incident edges for each vertex

69

* @param edgeDirection Direction of edges to collect

70

*/

71

def collectEdges(edgeDirection: EdgeDirection): VertexRDD[Array[Edge[ED]]]

72

```

73

74

**Usage Examples:**

75

76

```scala

77

// Collect outgoing neighbor IDs for each vertex

78

val outNeighbors = graph.collectNeighborIds(EdgeDirection.Out)

79

outNeighbors.collect().foreach { case (vertexId, neighbors) =>

80

println(s"Vertex $vertexId has outgoing neighbors: ${neighbors.mkString(", ")}")

81

}

82

83

// Collect incoming neighbors with attributes

84

val inNeighborsWithData = graph.collectNeighbors(EdgeDirection.In)

85

inNeighborsWithData.collect().foreach { case (vertexId, neighbors) =>

86

val neighborNames = neighbors.map(_._2).mkString(", ")

87

println(s"Vertex $vertexId has incoming neighbors: $neighborNames")

88

}

89

90

// Collect incident edges for each vertex

91

val outgoingEdges = graph.collectEdges(EdgeDirection.Out)

92

outgoingEdges.collect().foreach { case (vertexId, edges) =>

93

println(s"Vertex $vertexId has ${edges.length} outgoing edges")

94

}

95

96

// Collect all incident edges

97

val allEdges = graph.collectEdges(EdgeDirection.Either)

98

allEdges.collect().foreach { case (vertexId, edges) =>

99

println(s"Vertex $vertexId has ${edges.length} incident edges")

100

}

101

```

102

103

### Graph Cleaning Operations

104

105

Operations to clean and normalize graph structure.

106

107

```scala { .api }

108

/**

109

* Remove self-edges (edges where source equals destination)

110

*/

111

def removeSelfEdges(): Graph[VD, ED]

112

113

/**

114

* Convert to canonical edge format removing duplicate edges

115

* @param mergeFunc Function to merge duplicate edge attributes

116

*/

117

def convertToCanonicalEdges(mergeFunc: (ED, ED) => ED = (e1, e2) => e1): Graph[VD, ED]

118

```

119

120

**Usage Examples:**

121

122

```scala

123

// Remove self-loops from graph

124

val noSelfLoopsGraph = graph.removeSelfEdges()

125

126

// Convert to canonical form, merging parallel edges

127

val canonicalGraph = graph.convertToCanonicalEdges((a, b) => s"$a;$b")

128

```

129

130

### Vertex Operations

131

132

Operations for joining and transforming vertex data.

133

134

```scala { .api }

135

/**

136

* Join vertices with external data, only keeping matches

137

* @param table External RDD to join with

138

* @param mapFunc Function to combine original and joined data

139

*/

140

def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(

141

mapFunc: (VertexId, VD, U) => VD

142

): Graph[VD, ED]

143

144

/**

145

* Pick a random vertex ID from the graph

146

*/

147

def pickRandomVertex(): VertexId

148

```

149

150

**Usage Examples:**

151

152

```scala

153

// Join with external user data

154

val userProfiles: RDD[(VertexId, String)] = sc.parallelize(Array(

155

(1L, "Engineer"), (2L, "Designer"), (3L, "Manager")

156

))

157

158

val enrichedGraph = graph.joinVertices(userProfiles) { (id, name, profession) =>

159

s"$name - $profession"

160

}

161

162

// Pick random vertex for sampling

163

val randomVertex = graph.pickRandomVertex()

164

println(s"Random vertex selected: $randomVertex")

165

```

166

167

### Advanced Analytics Operations

168

169

Higher-level analytics operations and filtering.

170

171

```scala { .api }

172

/**

173

* Apply preprocessing to graph and filter result

174

* Used for complex graph transformations and analytics pipelines

175

*/

176

def filter[VD2: ClassTag, ED2: ClassTag](

177

preprocess: Graph[VD, ED] => Graph[VD2, ED2]

178

): Graph[VD, ED]

179

```

180

181

### Algorithm Integration Methods

182

183

Direct access to graph algorithms through GraphOps.

184

185

```scala { .api }

186

/**

187

* Run PageRank until convergence

188

* @param tol Convergence tolerance

189

* @param resetProb Random reset probability (default 0.15)

190

*/

191

def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

192

193

/**

194

* Personalized PageRank from specific source vertex

195

* @param src Source vertex for personalization

196

* @param tol Convergence tolerance

197

* @param resetProb Random reset probability

198

*/

199

def personalizedPageRank(src: VertexId, tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

200

201

/**

202

* Static PageRank with fixed iterations

203

* @param numIter Number of iterations

204

* @param resetProb Random reset probability

205

*/

206

def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]

207

208

/**

209

* Connected components algorithm

210

*/

211

def connectedComponents(): Graph[VertexId, ED]

212

213

/**

214

* Triangle counting algorithm

215

*/

216

def triangleCount(): Graph[Int, ED]

217

218

/**

219

* Strongly connected components with iteration limit

220

* @param numIter Maximum number of iterations

221

*/

222

def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]

223

```

224

225

**Usage Examples:**

226

227

```scala

228

// Run PageRank analysis

229

val pageRankResults = graph.pageRank(0.0001)

230

val topVertices = pageRankResults.vertices.top(5)(Ordering.by(_._2))

231

println("Top 5 vertices by PageRank:")

232

topVertices.foreach { case (id, rank) => println(s"Vertex $id: $rank") }

233

234

// Find connected components

235

val components = graph.connectedComponents()

236

val componentSizes = components.vertices.map(_._2).countByValue()

237

println(s"Found ${componentSizes.size} connected components")

238

239

// Count triangles

240

val triangleCounts = graph.triangleCount()

241

val totalTriangles = triangleCounts.vertices.map(_._2).sum() / 3

242

println(s"Total triangles in graph: $totalTriangles")

243

244

// Personalized PageRank from specific vertex

245

val personalizedPR = graph.personalizedPageRank(1L, 0.001)

246

val personalizedScores = personalizedPR.vertices.collect()

247

personalizedScores.foreach { case (id, score) =>

248

println(s"Personalized PageRank from vertex 1 to vertex $id: $score")

249

}

250

```

251

252

### Pregel Integration

253

254

Direct access to Pregel computation framework.

255

256

```scala { .api }

257

/**

258

* Run Pregel computation using vertex-centric programming model

259

* @param initialMsg Initial message sent to all vertices

260

* @param maxIterations Maximum number of iterations

261

* @param activeDirection Direction of active edges

262

* @param vprog Vertex program - how vertices update their state

263

* @param sendMsg Edge program - what messages to send along edges

264

* @param mergeMsg Message merge function

265

*/

266

def pregel[A: ClassTag](

267

initialMsg: A,

268

maxIterations: Int = Int.MaxValue,

269

activeDirection: EdgeDirection = EdgeDirection.Either

270

)(

271

vprog: (VertexId, VD, A) => VD,

272

sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],

273

mergeMsg: (A, A) => A

274

): Graph[VD, ED]

275

```

276

277

**Usage Examples:**

278

279

```scala

280

// Implement single-source shortest paths using Pregel

281

val sourceId: VertexId = 1L

282

283

// Initialize distances: 0 for source, infinity for others

284

val initialGraph = graph.mapVertices((id, _) =>

285

if (id == sourceId) 0.0 else Double.PositiveInfinity

286

)

287

288

val shortestPaths = initialGraph.pregel(Double.PositiveInfinity)(

289

// Vertex program: update distance if better path found

290

(id, dist, newDist) => math.min(dist, newDist),

291

292

// Send message: send distance + edge weight to neighbors

293

triplet => {

294

if (triplet.srcAttr + 1.0 < triplet.dstAttr) {

295

Iterator((triplet.dstId, triplet.srcAttr + 1.0))

296

} else {

297

Iterator.empty

298

}

299

},

300

301

// Merge messages: take minimum distance

302

(a, b) => math.min(a, b)

303

)

304

```

305

306

## Types

307

308

### EdgeDirection

309

310

Enumeration for specifying edge directions in neighborhood operations.

311

312

```scala { .api }

313

sealed abstract class EdgeDirection {

314

def reverse: EdgeDirection

315

}

316

317

object EdgeDirection {

318

/** Edges arriving at a vertex (incoming edges) */

319

case object In extends EdgeDirection

320

321

/** Edges originating from a vertex (outgoing edges) */

322

case object Out extends EdgeDirection

323

324

/** Edges in either direction (incoming OR outgoing) */

325

case object Either extends EdgeDirection

326

327

/** Edges in both directions (incoming AND outgoing) */

328

case object Both extends EdgeDirection

329

}

330

```