0
# Utilities
1
2
Graph loading, generation, and utility functions for creating test graphs, importing data, performance optimization, and working with GraphX efficiently.
3
4
## Capabilities
5
6
### Graph Loading
7
8
Load graphs from various file formats and data sources.
9
10
```scala { .api }
11
object GraphLoader {
12
/**
13
* Load graph from edge list file format
14
* @param sc SparkContext
15
* @param path Path to edge list file (each line: "srcId dstId" or "srcId dstId weight")
16
* @param canonicalOrientation Whether to orient edges canonically (srcId < dstId)
17
* @param numEdgePartitions Number of edge partitions (-1 for default)
18
* @param edgeStorageLevel Storage level for edges
19
* @param vertexStorageLevel Storage level for vertices
20
* @returns Graph with integer vertex and edge attributes
21
*/
22
def edgeListFile(
23
sc: SparkContext,
24
path: String,
25
canonicalOrientation: Boolean = false,
26
numEdgePartitions: Int = -1,
27
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
28
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
29
): Graph[Int, Int]
30
}
31
```
32
33
**Usage Examples:**
34
35
```scala
36
import org.apache.spark.graphx._
37
38
// Load graph from edge list file
39
// File format: each line contains "srcId dstId" or "srcId dstId weight"
40
val graph = GraphLoader.edgeListFile(sc, "path/to/edges.txt")
41
42
// Load with canonical orientation (srcId < dstId)
43
val canonicalGraph = GraphLoader.edgeListFile(
44
sc,
45
"path/to/edges.txt",
46
canonicalOrientation = true
47
)
48
49
// Load with custom partitioning and storage
50
val optimizedGraph = GraphLoader.edgeListFile(
51
sc,
52
"hdfs://cluster/large-graph.txt",
53
canonicalOrientation = false,
54
numEdgePartitions = 100,
55
edgeStorageLevel = StorageLevel.MEMORY_AND_DISK,
56
vertexStorageLevel = StorageLevel.MEMORY_ONLY
57
)
58
```
59
60
### Graph Generators
61
62
Generate synthetic graphs for testing, benchmarking, and algorithm development.
63
64
```scala { .api }
65
object GraphGenerators {
66
/** Default RMAT parameters for realistic graph generation */
67
val RMATa: Double = 0.45
68
val RMATb: Double = 0.15
69
val RMATd: Double = 0.25
70
// RMATc = 1.0 - RMATa - RMATb - RMATd = 0.15
71
72
/**
73
* Generate log-normal degree distribution graph
74
* @param sc SparkContext
75
* @param numVertices Number of vertices
76
* @param numEParts Number of edge partitions (-1 for default)
77
* @param mu Mean of underlying normal distribution
78
* @param sigma Standard deviation of underlying normal distribution
79
* @param seed Random seed for reproducibility
80
* @returns Graph with long vertex attributes and integer edge attributes
81
*/
82
def logNormalGraph(
83
sc: SparkContext,
84
numVertices: Int,
85
numEParts: Int = -1,
86
mu: Double = 4.0,
87
sigma: Double = 1.3,
88
seed: Long = -1
89
): Graph[Long, Int]
90
91
/**
92
* Generate R-MAT graph with realistic structure
93
* @param sc SparkContext
94
* @param requestedNumVertices Desired number of vertices (will be rounded up to power of 2)
95
* @param numEdges Number of edges to generate
96
* @param a Probability of edge in top-left quadrant
97
* @param b Probability of edge in top-right quadrant
98
* @param c Probability of edge in bottom-left quadrant
99
* @param d Probability of edge in bottom-right quadrant (a+b+c+d should equal 1.0)
100
* @param seed Random seed
101
* @param numEParts Number of edge partitions
102
* @returns R-MAT graph
103
*/
104
def rmatGraph(
105
sc: SparkContext,
106
requestedNumVertices: Int,
107
numEdges: Int,
108
a: Double = RMATa,
109
b: Double = RMATb,
110
c: Double = 1.0 - RMATa - RMATb - RMATd,
111
d: Double = RMATd,
112
seed: Long = -1,
113
numEParts: Int = -1
114
): Graph[Int, Int]
115
116
/**
117
* Generate star graph (one central vertex connected to all others)
118
* @param sc SparkContext
119
* @param nverts Number of vertices (including center)
120
* @param numEParts Number of edge partitions
121
* @returns Star graph with center at vertex 0
122
*/
123
def starGraph(
124
sc: SparkContext,
125
nverts: Int,
126
numEParts: Int = -1
127
): Graph[Int, Int]
128
129
/**
130
* Generate 2D grid graph
131
* @param sc SparkContext
132
* @param rows Number of rows in grid
133
* @param cols Number of columns in grid
134
* @returns Grid graph with vertices connected to adjacent cells
135
*/
136
def gridGraph(
137
sc: SparkContext,
138
rows: Int,
139
cols: Int
140
): Graph[(Int, Int), Double]
141
}
142
```
143
144
**Usage Examples:**
145
146
```scala
147
import org.apache.spark.graphx.util.GraphGenerators
148
149
// Generate log-normal degree distribution graph (realistic social networks)
150
val socialGraph = GraphGenerators.logNormalGraph(sc, numVertices = 1000)
151
152
// Generate R-MAT graph with default parameters
153
val rmatGraph = GraphGenerators.rmatGraph(sc, requestedNumVertices = 1024, numEdges = 5000)
154
155
// Generate custom R-MAT with different parameters
156
val customRMAT = GraphGenerators.rmatGraph(
157
sc,
158
requestedNumVertices = 2048,
159
numEdges = 10000,
160
a = 0.57, b = 0.19, c = 0.19, d = 0.05 // More skewed distribution
161
)
162
163
// Generate star graph for testing centrality algorithms
164
val starGraph = GraphGenerators.starGraph(sc, nverts = 100)
165
166
// Generate grid graph for spatial algorithms
167
val gridGraph = GraphGenerators.gridGraph(sc, rows = 10, cols = 10)
168
169
// Use generated graphs for testing
170
val pageRanks = rmatGraph.pageRank(0.001).vertices
171
val components = socialGraph.connectedComponents().vertices
172
```
173
174
### Graph Utilities and Optimization
175
176
Utility functions for graph optimization, serialization, and performance tuning.
177
178
```scala { .api }
179
object GraphXUtils {
180
/**
181
* Register GraphX classes with Kryo serialization for better performance
182
* @param conf SparkConf to modify
183
*/
184
def registerKryoClasses(conf: SparkConf): Unit
185
}
186
187
class PeriodicGraphCheckpointer[VD: ClassTag, ED: ClassTag](
188
checkpointInterval: Int,
189
sc: SparkContext
190
) {
191
/**
192
* Update the graph, managing checkpointing and persistence automatically
193
* @param graph New graph to manage
194
*/
195
def update(graph: Graph[VD, ED]): Unit
196
197
/**
198
* Checkpoint the current graph if needed
199
*/
200
def checkpoint(): Unit
201
202
/**
203
* Clean up all cached/checkpointed graphs
204
*/
205
def deleteAllCheckpoints(): Unit
206
}
207
208
object BytecodeUtils {
209
/**
210
* Test whether a closure invokes a specific method (for optimization)
211
* @param closure Function closure to analyze
212
* @param targetClass Class containing the target method
213
* @param targetMethod Name of method to check for
214
* @returns Whether the closure calls the target method
215
*/
216
def invokedMethod(
217
closure: AnyRef,
218
targetClass: Class[_],
219
targetMethod: String
220
): Boolean
221
}
222
```
223
224
### Partition Strategies
225
226
Different strategies for distributing edges across partitions to optimize performance.
227
228
```scala { .api }
229
trait PartitionStrategy {
230
/**
231
* Determine which partition an edge should be assigned to
232
* @param src Source vertex ID
233
* @param dst Destination vertex ID
234
* @param numParts Total number of partitions
235
* @returns Partition ID for this edge
236
*/
237
def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
238
}
239
240
object PartitionStrategy {
241
/**
242
* 2D edge partitioning with vertex replication bound of √numParts
243
* Provides good load balancing and communication efficiency
244
*/
245
val EdgePartition2D: PartitionStrategy
246
247
/**
248
* Hash partitioning by source vertex only
249
* Simple but can lead to load imbalance
250
*/
251
val EdgePartition1D: PartitionStrategy
252
253
/**
254
* Random partitioning that colocates same-direction edges
255
* Good for undirected graphs
256
*/
257
val RandomVertexCut: PartitionStrategy
258
259
/**
260
* Random partitioning that colocates all edges between vertex pairs
261
* Reduces communication for algorithms using both edge directions
262
*/
263
val CanonicalRandomVertexCut: PartitionStrategy
264
265
/**
266
* Get partition strategy by string name
267
* @param s Strategy name ("EdgePartition1D", "EdgePartition2D", etc.)
268
* @returns Corresponding PartitionStrategy
269
*/
270
def fromString(s: String): PartitionStrategy
271
}
272
```
273
274
**Usage Examples:**
275
276
```scala
277
// Optimize graph partitioning for better performance
278
val optimizedGraph = graph.partitionBy(PartitionStrategy.EdgePartition2D)
279
280
// Use different strategies based on graph characteristics
281
val strategy = if (graph.numVertices > 1000000) {
282
PartitionStrategy.EdgePartition2D // Better for large graphs
283
} else {
284
PartitionStrategy.RandomVertexCut // Simpler for small graphs
285
}
286
val partitionedGraph = graph.partitionBy(strategy)
287
288
// Get strategy from configuration
289
val strategyName = "EdgePartition1D"
290
val configuredStrategy = PartitionStrategy.fromString(strategyName)
291
```
292
293
### Performance Optimization Utilities
294
295
Helper functions and patterns for optimizing GraphX performance.
296
297
```scala { .api }
298
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
299
/**
300
* Convert multiple edges between vertices to single edges
301
* @param merge Function to combine edge attributes
302
* @returns Graph with merged parallel edges
303
*/
304
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]
305
306
/**
307
* Remove self-loops (edges from vertex to itself)
308
* @returns Graph without self-loops
309
*/
310
def removeSelfEdges(): Graph[VD, ED]
311
312
/**
313
* Pick a random vertex ID from the graph
314
* @returns Random vertex ID
315
*/
316
def pickRandomVertex(): VertexId
317
318
/**
319
* Filter graph with preprocessing optimization
320
* @param preprocess Function to optimize graph before filtering
321
* @param epred Edge predicate
322
* @param vpred Vertex predicate
323
* @returns Filtered graph
324
*/
325
def filter[VD2: ClassTag, ED2: ClassTag](
326
preprocess: Graph[VD, ED] => Graph[VD2, ED2],
327
epred: EdgeTriplet[VD2, ED2] => Boolean,
328
vpred: (VertexId, VD2) => Boolean
329
): Graph[VD, ED]
330
}
331
```
332
333
### TripletFields Optimization
334
335
Control which triplet fields are accessed to optimize message passing performance.
336
337
```scala { .api }
338
class TripletFields(val useSrc: Boolean, val useDst: Boolean, val useEdge: Boolean) {
339
// Java class for specifying which EdgeTriplet/EdgeContext fields are accessed
340
}
341
342
object TripletFields {
343
/** No fields are accessed */
344
val None: TripletFields
345
346
/** Only edge attribute is accessed */
347
val EdgeOnly: TripletFields
348
349
/** Source vertex and edge attributes are accessed */
350
val Src: TripletFields
351
352
/** Destination vertex and edge attributes are accessed */
353
val Dst: TripletFields
354
355
/** All fields are accessed (default) */
356
val All: TripletFields
357
}
358
```
359
360
**Usage Examples:**
361
362
```scala
363
// Optimize aggregateMessages with TripletFields
364
val inDegrees = graph.aggregateMessages[Int](
365
sendMsg = ctx => ctx.sendToDst(1), // Only sending to destination
366
mergeMsg = (a, b) => a + b,
367
tripletFields = TripletFields.None // No triplet fields needed
368
)
369
370
val weightedInDegrees = graph.aggregateMessages[Double](
371
sendMsg = ctx => ctx.sendToDst(ctx.attr), // Using edge attribute
372
mergeMsg = (a, b) => a + b,
373
tripletFields = TripletFields.EdgeOnly // Only edge attribute needed
374
)
375
376
val neighborSum = graph.aggregateMessages[Double](
377
sendMsg = ctx => ctx.sendToDst(ctx.srcAttr), // Using source attribute
378
mergeMsg = (a, b) => a + b,
379
tripletFields = TripletFields.Src // Only source attribute needed
380
)
381
```
382
383
## Performance Optimization Patterns
384
385
### Graph Construction Optimization
386
387
```scala
388
// Efficient graph construction for large datasets
389
def buildLargeGraph(vertices: RDD[(VertexId, String)], edges: RDD[Edge[Double]]): Graph[String, Double] = {
390
391
// Partition edges for better locality
392
val partitionedEdges = edges.partitionBy(new HashPartitioner(100))
393
394
// Use appropriate storage levels
395
val graph = Graph(
396
vertices,
397
partitionedEdges,
398
defaultVertexAttr = "Unknown",
399
edgeStorageLevel = StorageLevel.MEMORY_AND_DISK_SER,
400
vertexStorageLevel = StorageLevel.MEMORY_ONLY
401
)
402
403
// Apply efficient partitioning strategy
404
graph.partitionBy(PartitionStrategy.EdgePartition2D).cache()
405
}
406
```
407
408
### Iterative Algorithm Optimization
409
410
```scala
411
// Optimize for iterative algorithms
412
def optimizeForIterativeAlgorithms[VD: ClassTag, ED: ClassTag](
413
graph: Graph[VD, ED]
414
): Graph[VD, ED] = {
415
416
graph
417
.partitionBy(PartitionStrategy.EdgePartition2D) // Better load balancing
418
.cache() // Cache for multiple iterations
419
.checkpoint() // Checkpoint for fault tolerance
420
}
421
422
// Use with iterative algorithms
423
val optimizedGraph = optimizeForIterativeAlgorithms(graph)
424
val pageRanks = optimizedGraph.pageRank(0.001)
425
val components = optimizedGraph.connectedComponents()
426
427
// Clean up when done
428
optimizedGraph.unpersist()
429
```
430
431
### Memory Management
432
433
```scala
434
// Manage memory usage for large graphs
435
def memoryEfficientProcessing[VD: ClassTag, ED: ClassTag](
436
graph: Graph[VD, ED]
437
): Unit = {
438
439
// Use serialized storage for large graphs
440
val efficientGraph = graph.persist(StorageLevel.MEMORY_AND_DISK_SER)
441
442
try {
443
// Process graph
444
val results = efficientGraph.pageRank(0.001)
445
446
// Process results immediately
447
results.vertices.foreachPartition { iter =>
448
iter.foreach { case (id, rank) =>
449
// Process each result
450
}
451
}
452
453
} finally {
454
// Always clean up
455
efficientGraph.unpersist(blocking = false)
456
}
457
}
458
```
459
460
### Kryo Serialization Setup
461
462
```scala
463
// Configure Spark for optimal GraphX performance
464
def configureSparkForGraphX(appName: String): SparkContext = {
465
val conf = new SparkConf()
466
.setAppName(appName)
467
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
468
.set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
469
.set("spark.locality.wait", "0s") // Disable locality wait for GraphX
470
.set("spark.sql.adaptive.enabled", "false") // Can interfere with GraphX
471
472
// Register GraphX classes with Kryo
473
GraphXUtils.registerKryoClasses(conf)
474
475
new SparkContext(conf)
476
}
477
```
478
479
## Common Utility Patterns
480
481
### Graph Validation and Debugging
482
483
```scala
484
// Validate graph structure and properties
485
def validateGraph[VD, ED](graph: Graph[VD, ED]): Unit = {
486
println(s"Vertices: ${graph.numVertices}")
487
println(s"Edges: ${graph.numEdges}")
488
489
// Check for self-loops
490
val selfLoops = graph.edges.filter(e => e.srcId == e.dstId).count()
491
println(s"Self-loops: $selfLoops")
492
493
// Check degree distribution
494
val degrees = graph.degrees.map(_._2)
495
val maxDegree = degrees.max()
496
val avgDegree = degrees.mean()
497
println(s"Max degree: $maxDegree, Average degree: $avgDegree")
498
499
// Check connectivity
500
val components = graph.connectedComponents().vertices.map(_._2).distinct().count()
501
println(s"Connected components: $components")
502
}
503
```
504
505
### Graph Format Conversion
506
507
```scala
508
// Convert between different graph representations
509
def convertEdgeListToAdjacencyList[ED](graph: Graph[Long, ED]): RDD[(VertexId, Array[VertexId])] = {
510
graph.collectNeighborIds(EdgeDirection.Out)
511
}
512
513
def saveGraphToEdgeList[VD, ED](graph: Graph[VD, ED], path: String): Unit = {
514
graph.edges
515
.map(edge => s"${edge.srcId} ${edge.dstId}")
516
.saveAsTextFile(path)
517
}
518
519
def loadGraphFromAdjacencyList(sc: SparkContext, path: String): Graph[Int, Int] = {
520
val adjacencyList = sc.textFile(path).map { line =>
521
val parts = line.split("\\s+")
522
val src = parts(0).toLong
523
val neighbors = parts.tail.map(_.toLong)
524
(src, neighbors)
525
}
526
527
val edges = adjacencyList.flatMap { case (src, neighbors) =>
528
neighbors.map(dst => Edge(src, dst, 1))
529
}
530
531
Graph.fromEdges(edges, defaultValue = 0)
532
}
533
```