or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-graph-operations.mdgraph-algorithms.mdgraph-analytics.mdindex.mdloading-utilities.mdmessage-passing.mdrdd-abstractions.md

loading-utilities.mddocs/

0

# Graph Loading and Utilities

1

2

Utilities for loading graphs from files, partitioning strategies, configuration options, and various helper functions for graph construction and management.

3

4

## Capabilities

5

6

### Graph Loading

7

8

Utilities for loading graphs from external data sources and files.

9

10

```scala { .api }

11

/**

12

* Graph loading utilities

13

*/

14

object GraphLoader {

15

/**

16

* Load graph from edge list file

17

* Each line should contain: "srcId dstId" or "srcId dstId edgeAttr"

18

* @param sc SparkContext

19

* @param path Path to edge list file

20

* @param canonicalOrientation If true, orient edges src < dst (for undirected graphs)

21

* @param numEdgePartitions Number of partitions for edges (-1 for default)

22

* @param edgeStorageLevel Storage level for edge RDD

23

* @param vertexStorageLevel Storage level for vertex RDD

24

* @return Graph with Int vertex attributes and Int edge attributes

25

*/

26

def edgeListFile(

27

sc: SparkContext,

28

path: String,

29

canonicalOrientation: Boolean = false,

30

numEdgePartitions: Int = -1,

31

edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,

32

vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY

33

): Graph[Int, Int]

34

}

35

```

36

37

**Usage Examples:**

38

39

```scala

40

import org.apache.spark.graphx._

41

import org.apache.spark.storage.StorageLevel

42

43

// Load graph from simple edge list file

44

// File format: each line contains "srcId dstId"

45

val graph = GraphLoader.edgeListFile(sc, "/path/to/edges.txt")

46

47

// Load with canonical orientation (useful for undirected graphs)

48

val undirectedGraph = GraphLoader.edgeListFile(

49

sc,

50

"/path/to/edges.txt",

51

canonicalOrientation = true // Ensures srcId < dstId

52

)

53

54

// Load with custom storage and partitioning

55

val optimizedGraph = GraphLoader.edgeListFile(

56

sc,

57

"/path/to/large_graph.txt",

58

canonicalOrientation = false,

59

numEdgePartitions = 100, // More partitions for large graphs

60

edgeStorageLevel = StorageLevel.MEMORY_AND_DISK_SER,

61

vertexStorageLevel = StorageLevel.MEMORY_ONLY

62

)

63

64

// Example edge list file content:

65

// 1 2

66

// 2 3

67

// 3 1

68

// 1 4

69

```

70

71

### Custom Graph Loading Patterns

72

73

Common patterns for loading graphs from various data sources.

74

75

```scala

76

// Load from CSV files with custom parsing

77

def loadGraphFromCSV(sc: SparkContext, path: String): Graph[String, Double] = {

78

val edges = sc.textFile(path)

79

.map(line => {

80

val parts = line.split(",")

81

Edge(parts(0).toLong, parts(1).toLong, parts(2).toDouble)

82

})

83

84

val vertices = edges.flatMap(edge => Seq(edge.srcId, edge.dstId))

85

.distinct()

86

.map(id => (id, s"Vertex_$id"))

87

88

Graph(vertices, edges)

89

}

90

91

// Load from JSON data

92

def loadGraphFromJSON(sc: SparkContext, path: String): Graph[Map[String, String], String] = {

93

import org.json4s._

94

import org.json4s.jackson.JsonMethods._

95

96

val jsonData = sc.textFile(path).map(parse(_))

97

98

// Extract vertices from JSON

99

val vertices = jsonData.map { json =>

100

val id = (json \ "id").extract[Long]

101

val attributes = (json \ "attributes").extract[Map[String, String]]

102

(id, attributes)

103

}

104

105

// Extract edges from JSON

106

val edges = jsonData.flatMap { json =>

107

val srcId = (json \ "id").extract[Long]

108

(json \ "edges").extract[List[Map[String, String]]].map { edgeData =>

109

val dstId = edgeData("target").toLong

110

val edgeType = edgeData("type")

111

Edge(srcId, dstId, edgeType)

112

}

113

}

114

115

Graph(vertices, edges)

116

}

117

118

// Load from database query results

119

def loadGraphFromDatabase(sc: SparkContext): Graph[String, Int] = {

120

// Assuming you have RDDs from database queries

121

val vertices: RDD[(VertexId, String)] = sc.parallelize(Seq(

122

// Query results converted to (id, attribute) pairs

123

))

124

125

val edges: RDD[Edge[Int]] = sc.parallelize(Seq(

126

// Query results converted to Edge objects

127

))

128

129

Graph(vertices, edges)

130

}

131

```

132

133

### Partitioning Strategies

134

135

Different strategies for partitioning graph data across cluster nodes.

136

137

```scala { .api }

138

/**

139

* Base trait for edge partitioning strategies

140

*/

141

trait PartitionStrategy extends Serializable {

142

/**

143

* Determine which partition an edge should be assigned to

144

* @param src Source vertex ID

145

* @param dst Destination vertex ID

146

* @param numParts Total number of partitions

147

* @return Partition ID for this edge

148

*/

149

def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID

150

}

151

152

/**

153

* Built-in partitioning strategies

154

*/

155

object PartitionStrategy {

156

/**

157

* 2D edge partitioning with sqrt(numParts) x sqrt(numParts) grid

158

* Guarantees vertex replication bound of 2*sqrt(numParts)

159

* Good for graphs with many high-degree vertices

160

*/

161

case object EdgePartition2D extends PartitionStrategy

162

163

/**

164

* Hash partitioning using only source vertex ID

165

* Colocates all edges from same source vertex

166

* Good for algorithms that iterate over out-edges

167

*/

168

case object EdgePartition1D extends PartitionStrategy

169

170

/**

171

* Random vertex cut partitioning

172

* Randomly assigns edges while trying to balance partition sizes

173

* Colocates edges in same direction between vertex pairs

174

*/

175

case object RandomVertexCut extends PartitionStrategy

176

177

/**

178

* Canonical random vertex cut partitioning

179

* Like RandomVertexCut but ensures both directions of edge go to same partition

180

* Good for undirected graph algorithms

181

*/

182

case object CanonicalRandomVertexCut extends PartitionStrategy

183

}

184

```

185

186

**Usage Examples:**

187

188

```scala

189

// Apply different partitioning strategies

190

val graph = loadGraph()

191

192

// 2D partitioning - good for high-degree vertices

193

val graph2D = graph.partitionBy(PartitionStrategy.EdgePartition2D)

194

195

// 1D partitioning - good for out-edge iteration

196

val graph1D = graph.partitionBy(PartitionStrategy.EdgePartition1D)

197

198

// Random vertex cut - general purpose

199

val graphRandom = graph.partitionBy(PartitionStrategy.RandomVertexCut)

200

201

// Canonical random - good for undirected graphs

202

val graphCanonical = graph.partitionBy(PartitionStrategy.CanonicalRandomVertexCut)

203

204

// Choose strategy based on workload

205

val strategy = if (maxDegree > 1000) {

206

PartitionStrategy.EdgePartition2D // Handle high-degree vertices

207

} else if (algorithmType == "PageRank") {

208

PartitionStrategy.RandomVertexCut // Good for iterative algorithms

209

} else {

210

PartitionStrategy.EdgePartition1D // Default choice

211

}

212

213

val optimizedGraph = graph.partitionBy(strategy)

214

```

215

216

### GraphX Utilities

217

218

Utility functions for GraphX configuration and optimization.

219

220

```scala { .api }

221

/**

222

* GraphX utility functions

223

*/

224

object GraphXUtils {

225

/**

226

* Register GraphX classes with Kryo serialization for better performance

227

* Call this before creating SparkContext for optimal serialization

228

* @param conf SparkConf to register classes with

229

*/

230

def registerKryoClasses(conf: SparkConf): Unit

231

}

232

```

233

234

**Usage Examples:**

235

236

```scala

237

import org.apache.spark.{SparkConf, SparkContext}

238

import org.apache.spark.graphx.util.GraphXUtils

239

240

// Configure Spark for optimal GraphX performance

241

val conf = new SparkConf()

242

.setAppName("GraphX Application")

243

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

244

245

// Register GraphX classes for efficient serialization

246

GraphXUtils.registerKryoClasses(conf)

247

248

val sc = new SparkContext(conf)

249

250

// Now create and process graphs with optimized serialization

251

val graph = loadGraph()

252

```

253

254

### Graph Generators

255

256

Utility functions for generating synthetic graphs for testing and benchmarking.

257

258

```scala { .api }

259

/**

260

* Graph generation utilities for testing and benchmarking

261

*/

262

object GraphGenerators {

263

/**

264

* Generate log-normal degree distribution graph

265

* @param sc SparkContext

266

* @param numVertices Number of vertices to generate

267

* @param numEParts Number of edge partitions

268

* @param mu Mean of underlying normal distribution

269

* @param sigma Standard deviation of underlying normal distribution

270

* @param seed Random seed (-1 for random seed)

271

* @return Generated graph with Long vertex attributes and Int edge attributes

272

*/

273

def logNormalGraph(

274

sc: SparkContext,

275

numVertices: Int,

276

numEParts: Int = 0,

277

mu: Double = 4.0,

278

sigma: Double = 1.3,

279

seed: Long = -1

280

): Graph[Long, Int]

281

282

/**

283

* Generate R-MAT graph (Recursive Matrix)

284

* @param sc SparkContext

285

* @param requestedNumVertices Approximate number of vertices

286

* @param numEParts Number of edge partitions

287

* @return Generated R-MAT graph

288

*/

289

def rmatGraph(

290

sc: SparkContext,

291

requestedNumVertices: Int,

292

numEParts: Int

293

): Graph[Int, Int]

294

295

/**

296

* Generate star graph (one central vertex connected to all others)

297

* @param sc SparkContext

298

* @param nverts Total number of vertices

299

* @return Star graph

300

*/

301

def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int]

302

303

/**

304

* Generate grid graph (2D grid topology)

305

* @param sc SparkContext

306

* @param rows Number of rows in grid

307

* @param cols Number of columns in grid

308

* @return Grid graph with coordinate vertex attributes

309

*/

310

def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int, Int), Double]

311

}

312

```

313

314

**Usage Examples:**

315

316

```scala

317

import org.apache.spark.graphx.util.GraphGenerators

318

319

// Generate synthetic graphs for testing algorithms

320

val logNormalGraph = GraphGenerators.logNormalGraph(sc, 1000, numEParts = 4)

321

println(s"Log-normal graph: ${logNormalGraph.numVertices} vertices, ${logNormalGraph.numEdges} edges")

322

323

// Generate R-MAT graph (commonly used benchmark)

324

val rmatGraph = GraphGenerators.rmatGraph(sc, 1000, 4)

325

println(s"R-MAT graph: ${rmatGraph.numVertices} vertices, ${rmatGraph.numEdges} edges")

326

327

// Generate star graph for testing centrality algorithms

328

val starGraph = GraphGenerators.starGraph(sc, 100)

329

val centerVertex = starGraph.degrees.max()(Ordering.by(_._2))

330

println(s"Star graph center vertex: ${centerVertex._1} with degree ${centerVertex._2}")

331

332

// Generate grid graph for spatial algorithms

333

val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)

334

gridGraph.vertices.take(5).foreach { case (id, (row, col)) =>

335

println(s"Vertex $id at position ($row, $col)")

336

}

337

338

// Use synthetic graphs for algorithm benchmarking

339

def benchmarkAlgorithm(): Unit = {

340

val testGraphs = Seq(

341

("LogNormal", GraphGenerators.logNormalGraph(sc, 1000)),

342

("RMAT", GraphGenerators.rmatGraph(sc, 1000, 4)),

343

("Star", GraphGenerators.starGraph(sc, 1000)),

344

("Grid", GraphGenerators.gridGraph(sc, 32, 32))

345

)

346

347

testGraphs.foreach { case (name, graph) =>

348

val startTime = System.currentTimeMillis()

349

val result = PageRank.run(graph, 10)

350

val endTime = System.currentTimeMillis()

351

352

println(s"$name graph PageRank: ${endTime - startTime}ms")

353

}

354

}

355

```

356

357

### Configuration and Performance Tuning

358

359

Best practices for configuring GraphX applications.

360

361

```scala

362

// Optimal Spark configuration for GraphX

363

val conf = new SparkConf()

364

.setAppName("GraphX Application")

365

// Serialization

366

.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

367

// Memory management

368

.set("spark.executor.memory", "8g")

369

.set("spark.executor.cores", "4")

370

// Network

371

.set("spark.sql.adaptive.enabled", "true")

372

.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

373

// GraphX specific

374

.set("spark.graphx.pregel.checkpointInterval", "10")

375

376

GraphXUtils.registerKryoClasses(conf)

377

378

// Graph-specific optimizations

379

def optimizeGraph[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VD, ED] = {

380

graph

381

.partitionBy(PartitionStrategy.EdgePartition2D) // Good general partitioning

382

.cache() // Cache for iterative algorithms

383

}

384

385

// Storage level selection based on graph size and memory

386

def selectStorageLevel(numVertices: Long, numEdges: Long): StorageLevel = {

387

if (numVertices > 10000000 || numEdges > 50000000) {

388

StorageLevel.MEMORY_AND_DISK_SER // Large graphs - use disk spillover

389

} else if (numVertices > 1000000) {

390

StorageLevel.MEMORY_ONLY_SER // Medium graphs - serialize for space

391

} else {

392

StorageLevel.MEMORY_ONLY // Small graphs - keep unserized

393

}

394

}

395

396

// Dynamic partitioning based on graph characteristics

397

def selectPartitionStrategy[VD, ED](graph: Graph[VD, ED]): PartitionStrategy = {

398

val maxDegree = graph.degrees.map(_._2).max()

399

val avgDegree = graph.degrees.map(_._2).mean()

400

401

if (maxDegree > avgDegree * 100) {

402

PartitionStrategy.EdgePartition2D // High degree variance

403

} else if (avgDegree > 50) {

404

PartitionStrategy.RandomVertexCut // High average degree

405

} else {

406

PartitionStrategy.EdgePartition1D // Low degree graphs

407

}

408

}

409

```

410

411

### Data Import/Export Utilities

412

413

Utilities for saving and loading graphs in various formats.

414

415

```scala

416

// Save graph to files

417

def saveGraph[VD: ClassTag, ED: ClassTag](

418

graph: Graph[VD, ED],

419

vertexPath: String,

420

edgePath: String

421

): Unit = {

422

// Save vertices as (id, attribute) pairs

423

graph.vertices.map { case (id, attr) => s"$id\t$attr" }

424

.saveAsTextFile(vertexPath)

425

426

// Save edges as (src, dst, attr) triples

427

graph.edges.map { edge => s"${edge.srcId}\t${edge.dstId}\t${edge.attr}" }

428

.saveAsTextFile(edgePath)

429

}

430

431

// Load graph from saved files

432

def loadGraph(

433

sc: SparkContext,

434

vertexPath: String,

435

edgePath: String

436

): Graph[String, String] = {

437

val vertices = sc.textFile(vertexPath).map { line =>

438

val parts = line.split("\t")

439

(parts(0).toLong, parts(1))

440

}

441

442

val edges = sc.textFile(edgePath).map { line =>

443

val parts = line.split("\t")

444

Edge(parts(0).toLong, parts(1).toLong, parts(2))

445

}

446

447

Graph(vertices, edges)

448

}

449

450

// Export to standard graph formats

451

def exportToGraphML[VD, ED](graph: Graph[VD, ED], path: String): Unit = {

452

// Generate GraphML XML format

453

val vertices = graph.vertices.map { case (id, attr) =>

454

s"""<node id="$id"><data key="attr">$attr</data></node>"""

455

}.collect()

456

457

val edges = graph.edges.map { edge =>

458

s"""<edge source="${edge.srcId}" target="${edge.dstId}"><data key="attr">${edge.attr}</data></edge>"""

459

}.collect()

460

461

val graphML = s"""<?xml version="1.0" encoding="UTF-8"?>

462

<graphml xmlns="http://graphml.graphdrawing.org/xmlns">

463

<key id="attr" for="node" attr.name="attribute" attr.type="string"/>

464

<key id="attr" for="edge" attr.name="attribute" attr.type="string"/>

465

<graph edgedefault="directed">

466

${vertices.mkString("\n ")}

467

${edges.mkString("\n ")}

468

</graph>

469

</graphml>"""

470

471

// Save to file (pseudo-code - actual implementation depends on file system)

472

// writeToFile(path, graphML)

473

}

474

```

475

476

## Performance Considerations

477

478

### Loading Optimization

479

480

- **File format**: Use compressed formats (gzip, snappy) for large edge lists

481

- **Partitioning**: Specify appropriate number of partitions for large files

482

- **Storage levels**: Choose memory vs. disk based on graph size

483

- **Canonical orientation**: Use for undirected graphs to reduce edge count

484

485

### Partitioning Guidelines

486

487

- **EdgePartition2D**: Best for graphs with high-degree vertices

488

- **EdgePartition1D**: Good for algorithms that iterate over out-edges

489

- **RandomVertexCut**: General purpose, good for most iterative algorithms

490

- **CanonicalRandomVertexCut**: Use for undirected graph algorithms

491

492

### Memory Management

493

494

```scala

495

// Monitor memory usage and adjust accordingly

496

def monitorGraphMemory[VD, ED](graph: Graph[VD, ED]): Unit = {

497

val vertexMemory = graph.vertices.cache().count()

498

val edgeMemory = graph.edges.cache().count()

499

500

println(s"Cached vertices: $vertexMemory")

501

println(s"Cached edges: $edgeMemory")

502

503

// Unpersist when no longer needed

504

graph.vertices.unpersist()

505

graph.edges.unpersist()

506

}

507

```