or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-graph-api.mdgraph-algorithms.mdindex.mdpregel-api.mdutilities.md

graph-algorithms.mddocs/

0

# Graph Algorithms

1

2

Comprehensive collection of pre-implemented graph algorithms including PageRank, Connected Components, Triangle Counting, and community detection algorithms optimized for distributed execution.

3

4

## Capabilities

5

6

### PageRank Algorithm

7

8

Compute PageRank scores using both static (fixed iterations) and dynamic (convergence-based) implementations.

9

10

```scala { .api }

11

/**

12

* Run PageRank until convergence

13

* @param tol Tolerance for convergence (change in rank below this stops)

14

* @param resetProb Probability of random jump (damping factor = 1 - resetProb)

15

* @returns Graph with PageRank scores as vertex attributes

16

*/

17

def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]

18

19

/**

20

* Run PageRank for fixed number of iterations

21

* @param numIter Number of iterations to run

22

* @param resetProb Probability of random jump

23

* @returns Graph with PageRank scores as vertex attributes

24

*/

25

def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]

26

27

/**

28

* Run personalized PageRank from a source vertex

29

* @param src Source vertex for personalized PageRank

30

* @param tol Tolerance for convergence

31

* @param resetProb Probability of jumping back to source

32

* @returns Graph with personalized PageRank scores

33

*/

34

def personalizedPageRank(

35

src: VertexId,

36

tol: Double,

37

resetProb: Double = 0.15

38

): Graph[Double, Double]

39

40

object PageRank {

41

/**

42

* Static PageRank implementation

43

* @param graph Input graph

44

* @param numIter Number of iterations

45

* @param resetProb Random jump probability

46

* @returns Graph with PageRank scores

47

*/

48

def run[VD: ClassTag, ED: ClassTag](

49

graph: Graph[VD, ED],

50

numIter: Int,

51

resetProb: Double = 0.15

52

): Graph[Double, Double]

53

54

/**

55

* Dynamic PageRank until convergence

56

* @param graph Input graph

57

* @param tol Convergence tolerance

58

* @param resetProb Random jump probability

59

* @returns Graph with PageRank scores

60

*/

61

def runUntilConvergence[VD: ClassTag, ED: ClassTag](

62

graph: Graph[VD, ED],

63

tol: Double,

64

resetProb: Double = 0.15

65

): Graph[Double, Double]

66

67

/**

68

* PageRank with additional options

69

* @param graph Input graph

70

* @param numIter Number of iterations

71

* @param resetProb Random jump probability

72

* @param srcId Optional source vertex for personalized PageRank

73

* @returns Graph with PageRank scores

74

*/

75

def runWithOptions[VD: ClassTag, ED: ClassTag](

76

graph: Graph[VD, ED],

77

numIter: Int,

78

resetProb: Double,

79

srcId: Option[VertexId] = None

80

): Graph[Double, Double]

81

82

/**

83

* Parallel personalized PageRank for multiple sources

84

* @param graph Input graph

85

* @param numIter Number of iterations

86

* @param resetProb Random jump probability

87

* @param sources Set of source vertices

88

* @returns Graph with Vector of personalized PageRank scores

89

*/

90

def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](

91

graph: Graph[VD, ED],

92

numIter: Int,

93

resetProb: Double = 0.15,

94

sources: Array[VertexId]

95

): Graph[Vector, Double]

96

}

97

```

98

99

**Usage Examples:**

100

101

```scala

102

import org.apache.spark.graphx._

103

import org.apache.spark.graphx.lib._

104

105

// Dynamic PageRank until convergence

106

val ranks = graph.pageRank(0.0001).vertices

107

ranks.collect.foreach { case (id, rank) =>

108

println(s"Vertex $id has rank $rank")

109

}

110

111

// Static PageRank for 10 iterations

112

val staticRanks = graph.staticPageRank(10).vertices

113

114

// Personalized PageRank from vertex 1

115

val personalizedRanks = graph.personalizedPageRank(1L, 0.001).vertices

116

117

// Using PageRank object directly

118

val pageRankGraph = PageRank.run(graph, numIter = 20, resetProb = 0.1)

119

```

120

121

### Connected Components

122

123

Find connected components in undirected graphs using efficient label propagation.

124

125

```scala { .api }

126

/**

127

* Find connected components (assumes undirected graph)

128

* @returns Graph where each vertex has the smallest vertex ID in its component

129

*/

130

def connectedComponents(): Graph[VertexId, ED]

131

132

object ConnectedComponents {

133

/**

134

* Find connected components in a graph

135

* @param graph Input graph (treated as undirected)

136

* @returns Graph with component IDs as vertex attributes

137

*/

138

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED]

139

140

/**

141

* Find connected components with iteration limit

142

* @param graph Input graph

143

* @param maxIterations Maximum number of iterations

144

* @returns Graph with component IDs

145

*/

146

def run[VD: ClassTag, ED: ClassTag](

147

graph: Graph[VD, ED],

148

maxIterations: Int

149

): Graph[VertexId, ED]

150

}

151

```

152

153

### Strongly Connected Components

154

155

Find strongly connected components in directed graphs using iterative algorithms.

156

157

```scala { .api }

158

/**

159

* Compute strongly connected components

160

* @param numIter Number of iterations to run

161

* @returns Graph where each vertex has the smallest vertex ID in its SCC

162

*/

163

def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]

164

165

object StronglyConnectedComponents {

166

/**

167

* Find strongly connected components in directed graph

168

* @param graph Input directed graph

169

* @param numIter Number of iterations

170

* @returns Graph with SCC IDs as vertex attributes

171

*/

172

def run[VD: ClassTag, ED: ClassTag](

173

graph: Graph[VD, ED],

174

numIter: Int

175

): Graph[VertexId, ED]

176

}

177

```

178

179

### Triangle Counting

180

181

Count triangles in graphs for clustering coefficient computation and social network analysis.

182

183

```scala { .api }

184

/**

185

* Count triangles passing through each vertex

186

* @returns Graph with triangle counts as vertex attributes

187

*/

188

def triangleCount(): Graph[Int, ED]

189

190

object TriangleCount {

191

/**

192

* Count triangles (requires canonical edge orientation)

193

* @param graph Input graph

194

* @returns Graph with triangle counts per vertex

195

*/

196

def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[Int, ED]

197

198

/**

199

* Count triangles assuming graph is already in canonical form

200

* @param graph Pre-canonicalized graph

201

* @returns Graph with triangle counts

202

*/

203

def runPreCanonicalized[VD: ClassTag, ED: ClassTag](

204

graph: Graph[VD, ED]

205

): Graph[Int, ED]

206

}

207

```

208

209

**Usage Examples:**

210

211

```scala

212

// Find connected components

213

val components = graph.connectedComponents().vertices

214

components.collect.foreach { case (id, component) =>

215

println(s"Vertex $id belongs to component $component")

216

}

217

218

// Count triangles

219

val triangles = graph.triangleCount().vertices

220

triangles.collect.foreach { case (id, count) =>

221

println(s"Vertex $id participates in $count triangles")

222

}

223

224

// Strongly connected components

225

val scc = graph.stronglyConnectedComponents(10).vertices

226

```

227

228

### Label Propagation

229

230

Community detection algorithm using iterative label propagation for clustering and community structure discovery.

231

232

```scala { .api }

233

object LabelPropagation {

234

/**

235

* Run label propagation algorithm for community detection

236

* @param graph Input graph

237

* @param maxSteps Maximum number of propagation steps

238

* @returns Graph with community labels as vertex attributes

239

*/

240

def run[VD: ClassTag, ED: ClassTag](

241

graph: Graph[VD, ED],

242

maxSteps: Int

243

): Graph[VertexId, ED]

244

}

245

```

246

247

**Usage Examples:**

248

249

```scala

250

import org.apache.spark.graphx.lib.LabelPropagation

251

252

// Community detection using label propagation

253

val communities = LabelPropagation.run(graph, maxSteps = 5).vertices

254

communities.collect.foreach { case (id, community) =>

255

println(s"Vertex $id belongs to community $community")

256

}

257

```

258

259

### Shortest Paths

260

261

Compute shortest paths from vertices to a set of landmark vertices using breadth-first search.

262

263

```scala { .api }

264

object ShortestPaths {

265

/** Map from landmark vertex ID to shortest distance */

266

type SPMap = Map[VertexId, Int]

267

268

/**

269

* Compute shortest paths to landmark vertices

270

* @param graph Input graph (edge weights ignored, all edges have weight 1)

271

* @param landmarks Set of landmark vertex IDs

272

* @returns Graph with shortest distance maps as vertex attributes

273

*/

274

def run[VD: ClassTag, ED: ClassTag](

275

graph: Graph[VD, ED],

276

landmarks: Seq[VertexId]

277

): Graph[SPMap, ED]

278

}

279

```

280

281

**Usage Examples:**

282

283

```scala

284

import org.apache.spark.graphx.lib.ShortestPaths

285

286

// Compute shortest paths to landmarks 1, 2, 3

287

val landmarks = Seq(1L, 2L, 3L)

288

val distances = ShortestPaths.run(graph, landmarks).vertices

289

290

distances.collect.foreach { case (id, distanceMap) =>

291

println(s"Vertex $id distances: $distanceMap")

292

}

293

```

294

295

### SVD++ Collaborative Filtering

296

297

Matrix factorization algorithm for recommendation systems and collaborative filtering.

298

299

```scala { .api }

300

object SVDPlusPlus {

301

/**

302

* Configuration for SVD++ algorithm

303

* @param rank Number of latent factors

304

* @param maxIters Maximum iterations

305

* @param minVal Minimum rating value

306

* @param maxVal Maximum rating value

307

* @param gamma1 Learning rate for user factors

308

* @param gamma2 Learning rate for item factors

309

* @param gamma6 Learning rate for user bias

310

* @param gamma7 Learning rate for item bias

311

*/

312

case class Conf(

313

rank: Int = 10,

314

maxIters: Int = 2,

315

minVal: Double = 0.0,

316

maxVal: Double = 5.0,

317

gamma1: Double = 0.007,

318

gamma2: Double = 0.007,

319

gamma6: Double = 0.005,

320

gamma7: Double = 0.015

321

)

322

323

/**

324

* Run SVD++ collaborative filtering

325

* @param edges RDD of rating edges (user -> item with rating)

326

* @param conf Algorithm configuration

327

* @returns Tuple of (trained model graph, training error)

328

*/

329

def run(

330

edges: RDD[Edge[Double]],

331

conf: Conf

332

): (Graph[(Array[Double], Array[Double], Double, Double), Double], Double)

333

}

334

```

335

336

**Usage Examples:**

337

338

```scala

339

import org.apache.spark.graphx.lib.SVDPlusPlus

340

341

// Prepare rating data as edges

342

val ratings = sc.parallelize(Array(

343

Edge(1L, 101L, 4.0), // User 1 rates item 101 as 4.0

344

Edge(1L, 102L, 2.0),

345

Edge(2L, 101L, 5.0)

346

))

347

348

// Configure SVD++

349

val conf = SVDPlusPlus.Conf(

350

rank = 10, // 10 latent factors

351

maxIters = 20, // 20 iterations

352

minVal = 1.0, // Min rating 1.0

353

maxVal = 5.0 // Max rating 5.0

354

)

355

356

// Train model

357

val (model, rmse) = SVDPlusPlus.run(ratings, conf)

358

println(f"Training RMSE: $rmse%.3f")

359

360

// Extract learned factors

361

val userFactors = model.vertices.filter(_._1 < 100).collect // Users have ID < 100

362

val itemFactors = model.vertices.filter(_._1 >= 100).collect // Items have ID >= 100

363

```

364

365

### Graph Utilities for Algorithms

366

367

Helper functions and operations commonly used with graph algorithms.

368

369

```scala { .api }

370

class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {

371

/**

372

* Collect neighbor vertex IDs for each vertex

373

* @param edgeDirection Direction of edges to consider

374

* @returns VertexRDD with arrays of neighbor IDs

375

*/

376

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]

377

378

/**

379

* Collect neighbor vertices and their attributes

380

* @param edgeDirection Direction of edges to consider

381

* @returns VertexRDD with arrays of (VertexId, VertexAttribute) pairs

382

*/

383

def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]

384

385

/**

386

* Collect incident edges for each vertex

387

* @param edgeDirection Direction of edges to collect

388

* @returns VertexRDD with arrays of incident edges

389

*/

390

def collectEdges(edgeDirection: EdgeDirection): VertexRDD[Array[Edge[ED]]]

391

392

/**

393

* Remove self-loops from the graph

394

* @returns Graph with self-loops removed

395

*/

396

def removeSelfEdges(): Graph[VD, ED]

397

398

/**

399

* Convert to canonical edge direction (srcId < dstId)

400

* @param mergeFunc Function to merge duplicate edges after canonicalization

401

* @returns Canonicalized graph

402

*/

403

def convertToCanonicalEdges(mergeFunc: (ED, ED) => ED): Graph[VD, ED]

404

405

/**

406

* Pick a random vertex from the graph

407

* @returns Random vertex ID

408

*/

409

def pickRandomVertex(): VertexId

410

}

411

```

412

413

## Algorithm Performance Tips

414

415

### PageRank Optimization

416

417

```scala

418

// For large graphs, use static PageRank with appropriate iterations

419

val ranks = graph

420

.partitionBy(PartitionStrategy.EdgePartition2D) // Optimize partitioning

421

.cache() // Cache for iterations

422

.staticPageRank(20)

423

424

// For personalized PageRank on multiple sources

425

val parallelRanks = PageRank.runParallelPersonalizedPageRank(

426

graph, numIter = 10, resetProb = 0.15, sources = Array(1L, 2L, 3L)

427

)

428

```

429

430

### Connected Components for Large Graphs

431

432

```scala

433

// Use iteration limit for very large graphs to avoid excessive computation

434

val components = ConnectedComponents.run(graph, maxIterations = 50)

435

436

// Pre-partition for better performance

437

val partitionedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D)

438

val fasterComponents = partitionedGraph.connectedComponents()

439

```

440

441

### Triangle Counting Preparation

442

443

```scala

444

// Triangle counting requires canonical edges - GraphX will canonicalize automatically

445

// but you can pre-canonicalize for better performance

446

val canonicalGraph = graph.convertToCanonicalEdges((a, b) => a)

447

val triangles = TriangleCount.runPreCanonicalized(canonicalGraph)

448

449

// Remove self-loops before triangle counting for accuracy

450

val cleanGraph = graph.removeSelfEdges()

451

val triangleCounts = cleanGraph.triangleCount()

452

```

453

454

## Common Algorithm Patterns

455

456

### Iterative Convergence

457

458

```scala

459

// Pattern for implementing custom convergence-based algorithms

460

def iterativeAlgorithm[VD, ED](graph: Graph[VD, ED], tolerance: Double): Graph[VD, ED] = {

461

var g = graph.cache()

462

var converged = false

463

var iteration = 0

464

465

while (!converged && iteration < 100) {

466

val newG = g.pregel(/* pregel parameters */)

467

468

// Check convergence by comparing vertex attributes

469

val changes = g.vertices.join(newG.vertices).map {

470

case (id, (oldAttr, newAttr)) => math.abs(oldAttr - newAttr)

471

}.max()

472

473

converged = changes < tolerance

474

g.unpersist(blocking = false)

475

g = newG.cache()

476

iteration += 1

477

}

478

479

g

480

}

481

```

482

483

### Community Detection Pipeline

484

485

```scala

486

// Complete community detection and analysis pipeline

487

val graph = loadGraph()

488

489

// 1. Find communities using label propagation

490

val communities = LabelPropagation.run(graph, maxSteps = 10)

491

492

// 2. Analyze community structure

493

val communitySizes = communities.vertices

494

.map { case (id, community) => (community, 1) }

495

.reduceByKey(_ + _)

496

.collect()

497

498

// 3. Compute modularity within communities

499

val trianglesPerCommunity = communities

500

.subgraph(epred = triplet => triplet.srcAttr == triplet.dstAttr)

501

.triangleCount()

502

503

println(s"Found ${communitySizes.length} communities")

504

communitySizes.foreach { case (community, size) =>

505

println(s"Community $community has $size vertices")

506

}

507

```