or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-graph-api.mdgraph-algorithms.mdindex.mdpregel-api.mdutilities.md

utilities.mddocs/

0

# Utilities

1

2

Graph loading, generation, and utility functions for creating test graphs, importing data, performance optimization, and working with GraphX efficiently.

3

4

## Capabilities

5

6

### Graph Loading

7

8

Load graphs from various file formats and data sources.

9

10

```scala { .api }

11

object GraphLoader {

12

/**

13

* Load graph from edge list file format

14

* @param sc SparkContext

15

* @param path Path to edge list file (each line: "srcId dstId" or "srcId dstId weight")

16

* @param canonicalOrientation Whether to orient edges canonically (srcId < dstId)

17

* @param numEdgePartitions Number of edge partitions (-1 for default)

18

* @param edgeStorageLevel Storage level for edges

19

* @param vertexStorageLevel Storage level for vertices

20

* @returns Graph with integer vertex and edge attributes

21

*/

22

def edgeListFile(

23

sc: SparkContext,

24

path: String,

25

canonicalOrientation: Boolean = false,

26

numEdgePartitions: Int = -1,

27

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

28

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

29

): Graph[Int, Int]

30

}

31

```

32

33

**Usage Examples:**

34

35

```scala

36

import org.apache.spark.graphx._

37

38

// Load graph from edge list file

39

// File format: each line contains "srcId dstId" or "srcId dstId weight"

40

val graph = GraphLoader.edgeListFile(sc, "path/to/edges.txt")

41

42

// Load with canonical orientation (srcId < dstId)

43

val canonicalGraph = GraphLoader.edgeListFile(

44

sc,

45

"path/to/edges.txt",

46

canonicalOrientation = true

47

)

48

49

// Load with custom partitioning and storage

50

val optimizedGraph = GraphLoader.edgeListFile(

51

sc,

52

"hdfs://cluster/large-graph.txt",

53

canonicalOrientation = false,

54

numEdgePartitions = 100,

55

edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,

56

vertexStorageLevel = StorageLevel.MEMORY_ONLY

57

)

58

```

59

60

### Graph Generators

61

62

Generate synthetic graphs for testing, benchmarking, and algorithm development.

63

64

```scala { .api }

65

object GraphGenerators {

66

/** Default RMAT parameters for realistic graph generation */

67

val RMATa: Double = 0.45

68

val RMATb: Double = 0.15

69

val RMATd: Double = 0.25

70

// RMATc = 1.0 - RMATa - RMATb - RMATd = 0.15

71

72

/**

73

* Generate log-normal degree distribution graph

74

* @param sc SparkContext

75

* @param numVertices Number of vertices

76

* @param numEParts Number of edge partitions (-1 for default)

77

* @param mu Mean of underlying normal distribution

78

* @param sigma Standard deviation of underlying normal distribution

79

* @param seed Random seed for reproducibility

80

* @returns Graph with long vertex attributes and integer edge attributes

81

*/

82

def logNormalGraph(

83

sc: SparkContext,

84

numVertices: Int,

85

numEParts: Int = -1,

86

mu: Double = 4.0,

87

sigma: Double = 1.3,

88

seed: Long = -1

89

): Graph[Long, Int]

90

91

/**

92

* Generate R-MAT graph with realistic structure

93

* @param sc SparkContext

94

* @param requestedNumVertices Desired number of vertices (will be rounded up to power of 2)

95

* @param numEdges Number of edges to generate

96

* @param a Probability of edge in top-left quadrant

97

* @param b Probability of edge in top-right quadrant

98

* @param c Probability of edge in bottom-left quadrant

99

* @param d Probability of edge in bottom-right quadrant (a+b+c+d should equal 1.0)

100

* @param seed Random seed

101

* @param numEParts Number of edge partitions

102

* @returns R-MAT graph

103

*/

104

def rmatGraph(

105

sc: SparkContext,

106

requestedNumVertices: Int,

107

numEdges: Int,

108

a: Double = RMATa,

109

b: Double = RMATb,

110

c: Double = 1.0 - RMATa - RMATb - RMATd,

111

d: Double = RMATd,

112

seed: Long = -1,

113

numEParts: Int = -1

114

): Graph[Int, Int]

115

116

/**

117

* Generate star graph (one central vertex connected to all others)

118

* @param sc SparkContext

119

* @param nverts Number of vertices (including center)

120

* @param numEParts Number of edge partitions

121

* @returns Star graph with center at vertex 0

122

*/

123

def starGraph(

124

sc: SparkContext,

125

nverts: Int,

126

numEParts: Int = -1

127

): Graph[Int, Int]

128

129

/**

130

* Generate 2D grid graph

131

* @param sc SparkContext

132

* @param rows Number of rows in grid

133

* @param cols Number of columns in grid

134

* @returns Grid graph with vertices connected to adjacent cells

135

*/

136

def gridGraph(

137

sc: SparkContext,

138

rows: Int,

139

cols: Int

140

): Graph[(Int, Int), Double]

141

}

142

```

143

144

**Usage Examples:**

145

146

```scala

147

import org.apache.spark.graphx.util.GraphGenerators

148

149

// Generate log-normal degree distribution graph (realistic social networks)

150

val socialGraph = GraphGenerators.logNormalGraph(sc, numVertices = 1000)

151

152

// Generate R-MAT graph with default parameters

153

val rmatGraph = GraphGenerators.rmatGraph(sc, requestedNumVertices = 1024, numEdges = 5000)

154

155

// Generate custom R-MAT with different parameters

156

val customRMAT = GraphGenerators.rmatGraph(

157

sc,

158

requestedNumVertices = 2048,

159

numEdges = 10000,

160

a = 0.57, b = 0.19, c = 0.19, d = 0.05 // More skewed distribution

161

)

162

163

// Generate star graph for testing centrality algorithms

164

val starGraph = GraphGenerators.starGraph(sc, nverts = 100)

165

166

// Generate grid graph for spatial algorithms

167

val gridGraph = GraphGenerators.gridGraph(sc, rows = 10, cols = 10)

168

169

// Use generated graphs for testing

170

val pageRanks = rmatGraph.pageRank(0.001).vertices

171

val components = socialGraph.connectedComponents().vertices

172

```

173

174

### Graph Utilities and Optimization

175

176

Utility functions for graph optimization, serialization, and performance tuning.

177

178

```scala { .api }

179

object GraphXUtils {

180

/**

181

* Register GraphX classes with Kryo serialization for better performance

182

* @param conf SparkConf to modify

183

*/

184

def registerKryoClasses(conf: SparkConf): Unit

185

}

186

187

class PeriodicGraphCheckpointer[VD: ClassTag, ED: ClassTag](

188

checkpointInterval: Int,

189

sc: SparkContext

190

) {

191

/**

192

* Update the graph, managing checkpointing and persistence automatically

193

* @param graph New graph to manage

194

*/

195

def update(graph: Graph[VD, ED]): Unit

196

197

/**

198

* Checkpoint the current graph if needed

199

*/

200

def checkpoint(): Unit

201

202

/**

203

* Clean up all cached/checkpointed graphs

204

*/

205

def deleteAllCheckpoints(): Unit

206

}

207

208

object BytecodeUtils {

209

/**

210

* Test whether a closure invokes a specific method (for optimization)

211

* @param closure Function closure to analyze

212

* @param targetClass Class containing the target method

213

* @param targetMethod Name of method to check for

214

* @returns Whether the closure calls the target method

215

*/

216

def invokedMethod(

217

closure: AnyRef,

218

targetClass: Class[_],

219

targetMethod: String

220

): Boolean

221

}

222

```

223

224

### Partition Strategies

225

226

Different strategies for distributing edges across partitions to optimize performance.

227

228

```scala { .api }

229

trait PartitionStrategy {

230

/**

231

* Determine which partition an edge should be assigned to

232

* @param src Source vertex ID

233

* @param dst Destination vertex ID

234

* @param numParts Total number of partitions

235

* @returns Partition ID for this edge

236

*/

237

def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID

238

}

239

240

object PartitionStrategy {

241

/**

242

* 2D edge partitioning with vertex replication bound of √numParts

243

* Provides good load balancing and communication efficiency

244

*/

245

val EdgePartition2D: PartitionStrategy

246

247

/**

248

* Hash partitioning by source vertex only

249

* Simple but can lead to load imbalance

250

*/

251

val EdgePartition1D: PartitionStrategy

252

253

/**

254

* Random partitioning that colocates same-direction edges

255

* Good for undirected graphs

256

*/

257

val RandomVertexCut: PartitionStrategy

258

259

/**

260

* Random partitioning that colocates all edges between vertex pairs

261

* Reduces communication for algorithms using both edge directions

262

*/

263

val CanonicalRandomVertexCut: PartitionStrategy

264

265

/**

266

* Get partition strategy by string name

267

* @param s Strategy name ("EdgePartition1D", "EdgePartition2D", etc.)

268

* @returns Corresponding PartitionStrategy

269

*/

270

def fromString(s: String): PartitionStrategy

271

}

272

```

273

274

**Usage Examples:**

275

276

```scala

277

// Optimize graph partitioning for better performance

278

val optimizedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D)

279

280

// Use different strategies based on graph characteristics

281

val strategy = if (graph.numVertices > 1000000) {

282

PartitionStrategy.EdgePartition2D // Better for large graphs

283

} else {

284

PartitionStrategy.RandomVertexCut // Simpler for small graphs

285

}

286

val partitionedGraph = graph.partitionBy(strategy)

287

288

// Get strategy from configuration

289

val strategyName = "EdgePartition1D"

290

val configuredStrategy = PartitionStrategy.fromString(strategyName)

291

```

292

293

### Performance Optimization Utilities

294

295

Helper functions and patterns for optimizing GraphX performance.

296

297

```scala { .api }

298

class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {

299

/**

300

* Convert multiple edges between vertices to single edges

301

* @param merge Function to combine edge attributes

302

* @returns Graph with merged parallel edges

303

*/

304

def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

305

306

/**

307

* Remove self-loops (edges from vertex to itself)

308

* @returns Graph without self-loops

309

*/

310

def removeSelfEdges(): Graph[VD, ED]

311

312

/**

313

* Pick a random vertex ID from the graph

314

* @returns Random vertex ID

315

*/

316

def pickRandomVertex(): VertexId

317

318

/**

319

* Filter graph with preprocessing optimization

320

* @param preprocess Function to optimize graph before filtering

321

* @param epred Edge predicate

322

* @param vpred Vertex predicate

323

* @returns Filtered graph

324

*/

325

def filter[VD2: ClassTag, ED2: ClassTag](

326

preprocess: Graph[VD, ED] => Graph[VD2, ED2],

327

epred: EdgeTriplet[VD2, ED2] => Boolean,

328

vpred: (VertexId, VD2) => Boolean

329

): Graph[VD, ED]

330

}

331

```

332

333

### TripletFields Optimization

334

335

Control which triplet fields are accessed to optimize message passing performance.

336

337

```scala { .api }

338

class TripletFields(val useSrc: Boolean, val useDst: Boolean, val useEdge: Boolean) {

339

// Java class for specifying which EdgeTriplet/EdgeContext fields are accessed

340

}

341

342

object TripletFields {

343

/** No fields are accessed */

344

val None: TripletFields

345

346

/** Only edge attribute is accessed */

347

val EdgeOnly: TripletFields

348

349

/** Source vertex and edge attributes are accessed */

350

val Src: TripletFields

351

352

/** Destination vertex and edge attributes are accessed */

353

val Dst: TripletFields

354

355

/** All fields are accessed (default) */

356

val All: TripletFields

357

}

358

```

359

360

**Usage Examples:**

361

362

```scala

363

// Optimize aggregateMessages with TripletFields

364

val inDegrees = graph.aggregateMessages[Int](

365

sendMsg = ctx => ctx.sendToDst(1), // Only sending to destination

366

mergeMsg = (a, b) => a + b,

367

tripletFields = TripletFields.None // No triplet fields needed

368

)

369

370

val weightedInDegrees = graph.aggregateMessages[Double](

371

sendMsg = ctx => ctx.sendToDst(ctx.attr), // Using edge attribute

372

mergeMsg = (a, b) => a + b,

373

tripletFields = TripletFields.EdgeOnly // Only edge attribute needed

374

)

375

376

val neighborSum = graph.aggregateMessages[Double](

377

sendMsg = ctx => ctx.sendToDst(ctx.srcAttr), // Using source attribute

378

mergeMsg = (a, b) => a + b,

379

tripletFields = TripletFields.Src // Only source attribute needed

380

)

381

```

382

383

## Performance Optimization Patterns

384

385

### Graph Construction Optimization

386

387

```scala

388

// Efficient graph construction for large datasets

389

def buildLargeGraph(vertices: RDD[(VertexId, String)], edges: RDD[Edge[Double]]): Graph[String, Double] = {

390

391

// Partition edges for better locality

392

val partitionedEdges = edges.partitionBy(new HashPartitioner(100))

393

394

// Use appropriate storage levels

395

val graph = Graph(

396

vertices,

397

partitionedEdges,

398

defaultVertexAttr = "Unknown",

399

edgeStorageLevel = StorageLevel.MEMORY_AND_DISK_SER,

400

vertexStorageLevel = StorageLevel.MEMORY_ONLY

401

)

402

403

// Apply efficient partitioning strategy

404

graph.partitionBy(PartitionStrategy.EdgePartition2D).cache()

405

}

406

```

407

408

### Iterative Algorithm Optimization

409

410

```scala

411

// Optimize for iterative algorithms

412

def optimizeForIterativeAlgorithms[VD: ClassTag, ED: ClassTag](

413

graph: Graph[VD, ED]

414

): Graph[VD, ED] = {

415

416

graph

417

.partitionBy(PartitionStrategy.EdgePartition2D) // Better load balancing

418

.cache() // Cache for multiple iterations

419

.checkpoint() // Checkpoint for fault tolerance

420

}

421

422

// Use with iterative algorithms

423

val optimizedGraph = optimizeForIterativeAlgorithms(graph)

424

val pageRanks = optimizedGraph.pageRank(0.001)

425

val components = optimizedGraph.connectedComponents()

426

427

// Clean up when done

428

optimizedGraph.unpersist()

429

```

430

431

### Memory Management

432

433

```scala

434

// Manage memory usage for large graphs

435

def memoryEfficientProcessing[VD: ClassTag, ED: ClassTag](

436

graph: Graph[VD, ED]

437

): Unit = {

438

439

// Use serialized storage for large graphs

440

val efficientGraph = graph.persist(StorageLevel.MEMORY_AND_DISK_SER)

441

442

try {

443

// Process graph

444

val results = efficientGraph.pageRank(0.001)

445

446

// Process results immediately

447

results.vertices.foreachPartition { iter =>

448

iter.foreach { case (id, rank) =>

449

// Process each result

450

}

451

}

452

453

} finally {

454

// Always clean up

455

efficientGraph.unpersist(blocking = false)

456

}

457

}

458

```

459

460

### Kryo Serialization Setup

461

462

```scala

463

// Configure Spark for optimal GraphX performance

464

def configureSparkForGraphX(appName: String): SparkContext = {

465

val conf = new SparkConf()

466

.setAppName(appName)

467

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

468

.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")

469

.set("spark.locality.wait", "0s") // Disable locality wait for GraphX

470

.set("spark.sql.adaptive.enabled", "false") // Can interfere with GraphX

471

472

// Register GraphX classes with Kryo

473

GraphXUtils.registerKryoClasses(conf)

474

475

new SparkContext(conf)

476

}

477

```

478

479

## Common Utility Patterns

480

481

### Graph Validation and Debugging

482

483

```scala

484

// Validate graph structure and properties

485

def validateGraph[VD, ED](graph: Graph[VD, ED]): Unit = {

486

println(s"Vertices: ${graph.numVertices}")

487

println(s"Edges: ${graph.numEdges}")

488

489

// Check for self-loops

490

val selfLoops = graph.edges.filter(e => e.srcId == e.dstId).count()

491

println(s"Self-loops: $selfLoops")

492

493

// Check degree distribution

494

val degrees = graph.degrees.map(_._2)

495

val maxDegree = degrees.max()

496

val avgDegree = degrees.mean()

497

println(s"Max degree: $maxDegree, Average degree: $avgDegree")

498

499

// Check connectivity

500

val components = graph.connectedComponents().vertices.map(_._2).distinct().count()

501

println(s"Connected components: $components")

502

}

503

```

504

505

### Graph Format Conversion

506

507

```scala

508

// Convert between different graph representations

509

def convertEdgeListToAdjacencyList[ED](graph: Graph[Long, ED]): RDD[(VertexId, Array[VertexId])] = {

510

graph.collectNeighborIds(EdgeDirection.Out)

511

}

512

513

def saveGraphToEdgeList[VD, ED](graph: Graph[VD, ED], path: String): Unit = {

514

graph.edges

515

.map(edge => s"${edge.srcId} ${edge.dstId}")

516

.saveAsTextFile(path)

517

}

518

519

def loadGraphFromAdjacencyList(sc: SparkContext, path: String): Graph[Int, Int] = {

520

val adjacencyList = sc.textFile(path).map { line =>

521

val parts = line.split("\\s+")

522

val src = parts(0).toLong

523

val neighbors = parts.tail.map(_.toLong)

524

(src, neighbors)

525

}

526

527

val edges = adjacencyList.flatMap { case (src, neighbors) =>

528

neighbors.map(dst => Edge(src, dst, 1))

529

}

530

531

Graph.fromEdges(edges, defaultValue = 0)

532

}

533

```