or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-graph-operations.mdgraph-algorithms.mdgraph-analytics.mdindex.mdloading-utilities.mdmessage-passing.mdrdd-abstractions.md

graph-algorithms.mddocs/

0

# Graph Algorithms Library

1

2

Pre-implemented graph algorithms for common analytics tasks including centrality measures, community detection, path finding, and structural analysis.

3

4

## Capabilities

5

6

### PageRank Algorithm

7

8

PageRank algorithm implementation with multiple variants for computing vertex importance scores.

9

10

```scala { .api }

11

/**

12

* PageRank algorithm implementations

13

*/

14

object PageRank {

15

/**

16

* Static PageRank with fixed number of iterations

17

* @param graph Input graph

18

* @param numIter Number of iterations to run

19

* @param resetProb Random reset probability (default 0.15)

20

* @return Graph with PageRank scores as vertex attributes

21

*/

22

def run[VD: ClassTag, ED: ClassTag](

23

graph: Graph[VD, ED],

24

numIter: Int,

25

resetProb: Double = 0.15

26

): Graph[Double, Double]

27

28

/**

29

* PageRank until convergence based on tolerance

30

* @param graph Input graph

31

* @param tol Convergence tolerance

32

* @param resetProb Random reset probability (default 0.15)

33

* @return Graph with PageRank scores

34

*/

35

def runUntilConvergence[VD: ClassTag, ED: ClassTag](

36

graph: Graph[VD, ED],

37

tol: Double,

38

resetProb: Double = 0.15

39

): Graph[Double, Double]

40

41

/**

42

* PageRank with additional options including personalization

43

* @param graph Input graph

44

* @param numIter Number of iterations

45

* @param resetProb Random reset probability

46

* @param srcId Optional source vertex for personalized PageRank

47

* @return Graph with PageRank scores

48

*/

49

def runWithOptions[VD: ClassTag, ED: ClassTag](

50

graph: Graph[VD, ED],

51

numIter: Int,

52

resetProb: Double = 0.15,

53

srcId: Option[VertexId] = None

54

): Graph[Double, Double]

55

56

/**

57

* Parallel personalized PageRank from multiple sources

58

* @param graph Input graph

59

* @param numIter Number of iterations

60

* @param resetProb Random reset probability

61

* @param sources Array of source vertices

62

* @return Graph with vector of personalized scores

63

*/

64

def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](

65

graph: Graph[VD, ED],

66

numIter: Int,

67

resetProb: Double,

68

sources: Array[VertexId]

69

): Graph[Vector, Double]

70

71

/**

72

* Static personalized PageRank with fixed iterations from single source

73

* @param src Source vertex for personalization

74

*/

75

def staticPersonalizedPageRank[VD: ClassTag, ED: ClassTag](

76

graph: Graph[VD, ED],

77

src: VertexId,

78

numIter: Int,

79

resetProb: Double = 0.15

80

): Graph[Double, Double]

81

82

/**

83

* Static parallel personalized PageRank from multiple sources with fixed iterations

84

* @param sources Array of source vertices

85

*/

86

def staticParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](

87

graph: Graph[VD, ED],

88

sources: Array[VertexId],

89

numIter: Int,

90

resetProb: Double = 0.15

91

): Graph[Vector, Double]

92

}

93

```

94

95

**Usage Examples:**

96

97

```scala

98

import org.apache.spark.graphx._

99

import org.apache.spark.graphx.lib._

100

101

// Basic PageRank with fixed iterations

102

val pageRankGraph = PageRank.run(graph, numIter = 10)

103

val topVertices = pageRankGraph.vertices.top(5)(Ordering.by(_._2))

104

println("Top 5 vertices by PageRank:")

105

topVertices.foreach { case (id, rank) => println(s"Vertex $id: $rank") }

106

107

// PageRank until convergence

108

val convergedPR = PageRank.runUntilConvergence(graph, tol = 0.0001)

109

110

// Personalized PageRank from specific vertex

111

val personalizedPR = PageRank.runWithOptions(graph, numIter = 10, srcId = Some(1L))

112

113

// Parallel personalized PageRank from multiple sources

114

val sources = Array(1L, 2L, 3L)

115

val parallelPPR = PageRank.runParallelPersonalizedPageRank(graph, 10, 0.15, sources)

116

117

// Static personalized PageRank with fixed iterations

118

val staticPPR = PageRank.staticPersonalizedPageRank(graph, src = 1L, numIter = 5)

119

120

// Static parallel personalized PageRank with fixed iterations

121

val staticParallelPPR = PageRank.staticParallelPersonalizedPageRank(graph, sources, numIter = 5)

122

```

123

124

### Connected Components

125

126

Algorithm to find connected components in undirected graphs.

127

128

```scala { .api }

129

/**

130

* Connected components algorithm

131

*/

132

object ConnectedComponents {

133

/**

134

* Find connected components (runs until convergence)

135

* @param graph Input graph (treated as undirected)

136

* @return Graph where vertex values are component IDs

137

*/

138

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]

139

140

/**

141

* Find connected components with iteration limit

142

* @param graph Input graph

143

* @param maxIterations Maximum number of iterations

144

* @return Graph with component IDs

145

*/

146

def run[VD: ClassTag, ED: ClassTag](

147

graph: Graph[VD, ED],

148

maxIterations: Int

149

): Graph[VertexId, ED]

150

}

151

```

152

153

**Usage Examples:**

154

155

```scala

156

// Find all connected components

157

val components = ConnectedComponents.run(graph)

158

159

// Analyze component sizes

160

val componentSizes = components.vertices

161

.map(_._2) // Get component IDs

162

.countByValue() // Count vertices per component

163

.toSeq.sortBy(-_._2) // Sort by size descending

164

165

println(s"Found ${componentSizes.size} connected components:")

166

componentSizes.take(5).foreach { case (componentId, size) =>

167

println(s"Component $componentId: $size vertices")

168

}

169

170

// Find vertices in largest component

171

val largestComponent = componentSizes.head._1

172

val largestComponentVertices = components.vertices

173

.filter(_._2 == largestComponent)

174

.keys.collect()

175

176

println(s"Largest component contains vertices: ${largestComponentVertices.mkString(", ")}")

177

178

// Connected components with iteration limit

179

val limitedComponents = ConnectedComponents.run(graph, maxIterations = 5)

180

```

181

182

### Triangle Counting

183

184

Algorithm to count triangles in the graph for clustering analysis.

185

186

```scala { .api }

187

/**

188

* Triangle counting algorithm

189

*/

190

object TriangleCount {

191

/**

192

* Count triangles passing through each vertex

193

* Graph must be partitioned with RandomVertexCut or CanonicalRandomVertexCut

194

* @param graph Input graph

195

* @return Graph where vertex values are triangle counts

196

*/

197

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]

198

199

/**

200

* Count triangles on pre-canonicalized graph (edges in canonical form)

201

* More efficient if graph is already in canonical form

202

* @param graph Input graph with canonical edges

203

* @return Graph with triangle counts

204

*/

205

def runPreCanonicalized[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]

206

}

207

```

208

209

**Usage Examples:**

210

211

```scala

212

// Ensure proper partitioning for triangle counting

213

val partitionedGraph = graph.partitionBy(PartitionStrategy.RandomVertexCut)

214

215

// Count triangles

216

val triangleCounts = TriangleCount.run(partitionedGraph)

217

val totalTriangles = triangleCounts.vertices.map(_._2).sum() / 3 // Each triangle counted 3 times

218

219

println(s"Total triangles in graph: $totalTriangles")

220

221

// Find vertices with most triangles

222

val topTriangleVertices = triangleCounts.vertices.top(10)(Ordering.by(_._2))

223

println("Vertices with most triangles:")

224

topTriangleVertices.foreach { case (id, count) =>

225

println(s"Vertex $id: $count triangles")

226

}

227

228

// Calculate clustering coefficient

229

val degrees = graph.degrees

230

val clustering = triangleCounts.vertices.innerJoin(degrees) { (id, triangles, degree) =>

231

if (degree > 1) 2.0 * triangles / (degree * (degree - 1)) else 0.0

232

}

233

234

val avgClustering = clustering.map(_._2).mean()

235

println(s"Average clustering coefficient: $avgClustering")

236

```

237

238

### Strongly Connected Components

239

240

Algorithm to find strongly connected components in directed graphs.

241

242

```scala { .api }

243

/**

244

* Strongly connected components algorithm

245

*/

246

object StronglyConnectedComponents {

247

/**

248

* Find strongly connected components with fixed iterations

249

* @param graph Input directed graph

250

* @param numIter Number of iterations to run

251

* @return Graph where vertex values are SCC IDs

252

*/

253

def run[VD: ClassTag, ED: ClassTag](

254

graph: Graph[VD, ED],

255

numIter: Int

256

): Graph[VertexId, ED]

257

}

258

```

259

260

**Usage Examples:**

261

262

```scala

263

// Find strongly connected components

264

val sccGraph = StronglyConnectedComponents.run(graph, numIter = 10)

265

266

// Analyze SCC structure

267

val sccSizes = sccGraph.vertices

268

.map(_._2)

269

.countByValue()

270

.toSeq.sortBy(-_._2)

271

272

println(s"Found ${sccSizes.size} strongly connected components:")

273

sccSizes.take(5).foreach { case (sccId, size) =>

274

println(s"SCC $sccId: $size vertices")

275

}

276

277

// Find vertices in same SCC as a specific vertex

278

val targetVertex = 1L

279

val targetSCC = sccGraph.vertices.filter(_._1 == targetVertex).map(_._2).first()

280

val sameSCCVertices = sccGraph.vertices

281

.filter(_._2 == targetSCC)

282

.keys.collect()

283

284

println(s"Vertices in same SCC as vertex $targetVertex: ${sameSCCVertices.mkString(", ")}")

285

```

286

287

### Label Propagation

288

289

Community detection algorithm using label propagation.

290

291

```scala { .api }

292

/**

293

* Label propagation algorithm for community detection

294

*/

295

object LabelPropagation {

296

/**

297

* Run label propagation community detection

298

* @param graph Input graph (treated as undirected)

299

* @param maxSteps Maximum number of propagation steps

300

* @return Graph where vertex values are community labels

301

*/

302

def run[VD, ED: ClassTag](graph: Graph[VD, ED], maxSteps: Int): Graph[VertexId, ED]

303

}

304

```

305

306

**Usage Examples:**

307

308

```scala

309

// Detect communities using label propagation

310

val communities = LabelPropagation.run(graph, maxSteps = 5)

311

312

// Analyze community structure

313

val communitySizes = communities.vertices

314

.map(_._2)

315

.countByValue()

316

.toSeq.sortBy(-_._2)

317

318

println(s"Detected ${communitySizes.size} communities:")

319

communitySizes.take(10).foreach { case (communityId, size) =>

320

println(s"Community $communityId: $size vertices")

321

}

322

323

// Calculate modularity or other community quality measures

324

val communityMemberships = communities.vertices.collectAsMap()

325

```

326

327

### Shortest Paths

328

329

Single-source shortest paths to landmark vertices.

330

331

```scala { .api }

332

/**

333

* Shortest paths algorithm

334

*/

335

object ShortestPaths {

336

/** Type alias for shortest path maps */

337

type SPMap = Map[VertexId, Int]

338

339

/**

340

* Single-source shortest paths to specified landmark vertices

341

* @param graph Input graph (edge weights assumed to be 1)

342

* @param landmarks Set of landmark vertices to find paths to

343

* @return Graph where vertices contain maps of distances to landmarks

344

*/

345

def run[VD, ED: ClassTag](

346

graph: Graph[VD, ED],

347

landmarks: Seq[VertexId]

348

): Graph[SPMap, ED]

349

}

350

```

351

352

**Usage Examples:**

353

354

```scala

355

// Find shortest paths to landmark vertices

356

val landmarks = Seq(1L, 5L, 10L)

357

val shortestPaths = ShortestPaths.run(graph, landmarks)

358

359

// Extract distances to landmarks

360

shortestPaths.vertices.collect().foreach { case (vertexId, pathMap) =>

361

println(s"Vertex $vertexId distances:")

362

pathMap.foreach { case (landmark, distance) =>

363

println(s" To landmark $landmark: $distance hops")

364

}

365

}

366

367

// Find vertices closest to specific landmark

368

val targetLandmark = 1L

369

val closestToLandmark = shortestPaths.vertices

370

.map { case (id, pathMap) => (id, pathMap.getOrElse(targetLandmark, Int.MaxValue)) }

371

.filter(_._2 < Int.MaxValue)

372

.takeOrdered(5)(Ordering.by(_._2))

373

374

println(s"Closest vertices to landmark $targetLandmark:")

375

closestToLandmark.foreach { case (id, distance) =>

376

println(s"Vertex $id: distance $distance")

377

}

378

```

379

380

### SVD++

381

382

Collaborative filtering algorithm implementation.

383

384

```scala { .api }

385

/**

386

* SVD++ algorithm for collaborative filtering and matrix factorization

387

* Implementation based on "Factorization Meets the Neighborhood" paper

388

* Typically used on bipartite graphs (users-items)

389

*/

390

object SVDPlusPlus {

391

/**

392

* Configuration parameters for SVD++ algorithm

393

*/

394

class Conf(

395

var rank: Int, // Number of latent factors

396

var maxIters: Int, // Maximum iterations

397

var minVal: Double, // Minimum rating value

398

var maxVal: Double, // Maximum rating value

399

var gamma1: Double, // Learning rate for user factors

400

var gamma2: Double, // Learning rate for item factors

401

var gamma6: Double, // Learning rate for user bias

402

var gamma7: Double // Learning rate for item bias

403

) extends Serializable

404

405

/**

406

* Run SVD++ algorithm on rating graph

407

* @param edges Rating edges with Double attributes

408

* @param conf Algorithm configuration parameters

409

* @return Tuple of (trained model graph, final RMSE)

410

*/

411

def run(edges: RDD[Edge[Double]], conf: Conf):

412

(Graph[(Array[Double], Array[Double], Double, Double), Double], Double)

413

}

414

```

415

416

## Algorithm Integration Patterns

417

418

### Combining Multiple Algorithms

419

420

```scala

421

// Example: Comprehensive graph analysis pipeline

422

val graph = loadGraph()

423

424

// 1. Basic metrics

425

val metrics = (graph.numVertices, graph.numEdges,

426

graph.degrees.map(_._2).mean())

427

println(s"Graph: ${metrics._1} vertices, ${metrics._2} edges, avg degree ${metrics._3}")

428

429

// 2. Structural analysis

430

val components = ConnectedComponents.run(graph)

431

val triangles = TriangleCount.run(graph.partitionBy(PartitionStrategy.RandomVertexCut))

432

433

// 3. Centrality analysis

434

val pagerank = PageRank.runUntilConvergence(graph, 0.0001)

435

val centralVertices = pagerank.vertices.top(10)(Ordering.by(_._2))

436

437

// 4. Community detection

438

val communities = LabelPropagation.run(graph, 10)

439

440

// 5. Path analysis

441

val landmarks = centralVertices.take(3).map(_._1)

442

val paths = ShortestPaths.run(graph, landmarks)

443

444

// Combine results for comprehensive analysis

445

val analysis = graph.vertices.innerJoin(pagerank.vertices) { (id, orig, pr) =>

446

(orig, pr)

447

}.innerJoin(components.vertices) { (id, (orig, pr), comp) =>

448

(orig, pr, comp)

449

}.innerJoin(communities.vertices) { (id, (orig, pr, comp), comm) =>

450

(orig, pr, comp, comm)

451

}

452

```

453

454

### Custom Algorithm Implementation

455

456

```scala

457

// Template for implementing custom algorithms using existing primitives

458

def myCustomAlgorithm[VD: ClassTag, ED: ClassTag](

459

graph: Graph[VD, ED],

460

maxIter: Int

461

): Graph[Double, ED] = {

462

463

// Initialize vertex values

464

var g = graph.mapVertices((id, attr) => 0.0)

465

466

// Iterative computation

467

for (i <- 0 until maxIter) {

468

// Use aggregateMessages for computation

469

val newValues = g.aggregateMessages[Double](

470

triplet => {

471

// Send messages based on current state

472

triplet.sendToSrc(triplet.dstAttr + 1.0)

473

triplet.sendToDst(triplet.srcAttr + 1.0)

474

},

475

(a, b) => a + b // Merge messages

476

)

477

478

// Update vertex values

479

g = g.outerJoinVertices(newValues) { (id, oldValue, msgOpt) =>

480

msgOpt.getOrElse(oldValue)

481

}

482

}

483

484

g

485

}

486

```

487

488

## Performance Considerations

489

490

### Algorithm-Specific Optimizations

491

492

- **PageRank**: Use `runUntilConvergence` for accuracy, `run` with fixed iterations for predictable runtime

493

- **Triangle Counting**: Requires proper partitioning (`RandomVertexCut` or `CanonicalRandomVertexCut`)

494

- **Connected Components**: May require many iterations on large graphs with long paths

495

- **Label Propagation**: Fast but non-deterministic; results may vary between runs

496

- **Shortest Paths**: Memory usage grows with number of landmarks

497

498

### General Algorithm Guidelines

499

500

```scala

501

// Optimize graph before running algorithms

502

val optimizedGraph = graph

503

.partitionBy(PartitionStrategy.EdgePartition2D) // Good general partitioning

504

.cache() // Cache for repeated use

505

506

// Run multiple algorithms on same cached graph

507

val pagerank = PageRank.run(optimizedGraph, 10)

508

val components = ConnectedComponents.run(optimizedGraph)

509

val triangles = TriangleCount.run(optimizedGraph)

510

```