0
# Graph Loading and Utilities
1
2
Utilities for loading graphs from files, partitioning strategies, configuration options, and various helper functions for graph construction and management.
3
4
## Capabilities
5
6
### Graph Loading
7
8
Utilities for loading graphs from external data sources and files.
9
10
```scala { .api }
11
/**
12
* Graph loading utilities
13
*/
14
object GraphLoader {
15
/**
16
* Load graph from edge list file
17
* Each line should contain: "srcId dstId" or "srcId dstId edgeAttr"
18
* @param sc SparkContext
19
* @param path Path to edge list file
20
* @param canonicalOrientation If true, orient edges src < dst (for undirected graphs)
21
* @param numEdgePartitions Number of partitions for edges (-1 for default)
22
* @param edgeStorageLevel Storage level for edge RDD
23
* @param vertexStorageLevel Storage level for vertex RDD
24
* @return Graph with Int vertex attributes and Int edge attributes
25
*/
26
def edgeListFile(
27
sc: SparkContext,
28
path: String,
29
canonicalOrientation: Boolean = false,
30
numEdgePartitions: Int = -1,
31
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
32
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
33
): Graph[Int, Int]
34
}
35
```
36
37
**Usage Examples:**
38
39
```scala
40
import org.apache.spark.graphx._
41
import org.apache.spark.storage.StorageLevel
42
43
// Load graph from simple edge list file
44
// File format: each line contains "srcId dstId"
45
val graph = GraphLoader.edgeListFile(sc, "/path/to/edges.txt")
46
47
// Load with canonical orientation (useful for undirected graphs)
48
val undirectedGraph = GraphLoader.edgeListFile(
49
sc,
50
"/path/to/edges.txt",
51
canonicalOrientation = true // Ensures srcId < dstId
52
)
53
54
// Load with custom storage and partitioning
55
val optimizedGraph = GraphLoader.edgeListFile(
56
sc,
57
"/path/to/large_graph.txt",
58
canonicalOrientation = false,
59
numEdgePartitions = 100, // More partitions for large graphs
60
edgeStorageLevel = StorageLevel.MEMORY_AND_DISK_SER,
61
vertexStorageLevel = StorageLevel.MEMORY_ONLY
62
)
63
64
// Example edge list file content:
65
// 1 2
66
// 2 3
67
// 3 1
68
// 1 4
69
```
70
71
### Custom Graph Loading Patterns
72
73
Common patterns for loading graphs from various data sources.
74
75
```scala
76
// Load from CSV files with custom parsing
77
def loadGraphFromCSV(sc: SparkContext, path: String): Graph[String, Double] = {
78
val edges = sc.textFile(path)
79
.map(line => {
80
val parts = line.split(",")
81
Edge(parts(0).toLong, parts(1).toLong, parts(2).toDouble)
82
})
83
84
val vertices = edges.flatMap(edge => Seq(edge.srcId, edge.dstId))
85
.distinct()
86
.map(id => (id, s"Vertex_$id"))
87
88
Graph(vertices, edges)
89
}
90
91
// Load from JSON data
92
def loadGraphFromJSON(sc: SparkContext, path: String): Graph[Map[String, String], String] = {
93
import org.json4s._
94
import org.json4s.jackson.JsonMethods._
95
96
val jsonData = sc.textFile(path).map(parse(_))
97
98
// Extract vertices from JSON
99
val vertices = jsonData.map { json =>
100
val id = (json \ "id").extract[Long]
101
val attributes = (json \ "attributes").extract[Map[String, String]]
102
(id, attributes)
103
}
104
105
// Extract edges from JSON
106
val edges = jsonData.flatMap { json =>
107
val srcId = (json \ "id").extract[Long]
108
(json \ "edges").extract[List[Map[String, String]]].map { edgeData =>
109
val dstId = edgeData("target").toLong
110
val edgeType = edgeData("type")
111
Edge(srcId, dstId, edgeType)
112
}
113
}
114
115
Graph(vertices, edges)
116
}
117
118
// Load from database query results
119
def loadGraphFromDatabase(sc: SparkContext): Graph[String, Int] = {
120
// Assuming you have RDDs from database queries
121
val vertices: RDD[(VertexId, String)] = sc.parallelize(Seq(
122
// Query results converted to (id, attribute) pairs
123
))
124
125
val edges: RDD[Edge[Int]] = sc.parallelize(Seq(
126
// Query results converted to Edge objects
127
))
128
129
Graph(vertices, edges)
130
}
131
```
132
133
### Partitioning Strategies
134
135
Different strategies for partitioning graph data across cluster nodes.
136
137
```scala { .api }
138
/**
139
* Base trait for edge partitioning strategies
140
*/
141
trait PartitionStrategy extends Serializable {
142
/**
143
* Determine which partition an edge should be assigned to
144
* @param src Source vertex ID
145
* @param dst Destination vertex ID
146
* @param numParts Total number of partitions
147
* @return Partition ID for this edge
148
*/
149
def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID
150
}
151
152
/**
153
* Built-in partitioning strategies
154
*/
155
object PartitionStrategy {
156
/**
157
* 2D edge partitioning with sqrt(numParts) x sqrt(numParts) grid
158
* Guarantees vertex replication bound of 2*sqrt(numParts)
159
* Good for graphs with many high-degree vertices
160
*/
161
case object EdgePartition2D extends PartitionStrategy
162
163
/**
164
* Hash partitioning using only source vertex ID
165
* Colocates all edges from same source vertex
166
* Good for algorithms that iterate over out-edges
167
*/
168
case object EdgePartition1D extends PartitionStrategy
169
170
/**
171
* Random vertex cut partitioning
172
* Randomly assigns edges while trying to balance partition sizes
173
* Colocates edges in same direction between vertex pairs
174
*/
175
case object RandomVertexCut extends PartitionStrategy
176
177
/**
178
* Canonical random vertex cut partitioning
179
* Like RandomVertexCut but ensures both directions of edge go to same partition
180
* Good for undirected graph algorithms
181
*/
182
case object CanonicalRandomVertexCut extends PartitionStrategy
183
}
184
```
185
186
**Usage Examples:**
187
188
```scala
189
// Apply different partitioning strategies
190
val graph = loadGraph()
191
192
// 2D partitioning - good for high-degree vertices
193
val graph2D = graph.partitionBy(PartitionStrategy.EdgePartition2D)
194
195
// 1D partitioning - good for out-edge iteration
196
val graph1D = graph.partitionBy(PartitionStrategy.EdgePartition1D)
197
198
// Random vertex cut - general purpose
199
val graphRandom = graph.partitionBy(PartitionStrategy.RandomVertexCut)
200
201
// Canonical random - good for undirected graphs
202
val graphCanonical = graph.partitionBy(PartitionStrategy.CanonicalRandomVertexCut)
203
204
// Choose strategy based on workload
205
val strategy = if (maxDegree > 1000) {
206
PartitionStrategy.EdgePartition2D // Handle high-degree vertices
207
} else if (algorithmType == "PageRank") {
208
PartitionStrategy.RandomVertexCut // Good for iterative algorithms
209
} else {
210
PartitionStrategy.EdgePartition1D // Default choice
211
}
212
213
val optimizedGraph = graph.partitionBy(strategy)
214
```
215
216
### GraphX Utilities
217
218
Utility functions for GraphX configuration and optimization.
219
220
```scala { .api }
221
/**
222
* GraphX utility functions
223
*/
224
object GraphXUtils {
225
/**
226
* Register GraphX classes with Kryo serialization for better performance
227
* Call this before creating SparkContext for optimal serialization
228
* @param conf SparkConf to register classes with
229
*/
230
def registerKryoClasses(conf: SparkConf): Unit
231
}
232
```
233
234
**Usage Examples:**
235
236
```scala
237
import org.apache.spark.{SparkConf, SparkContext}
238
import org.apache.spark.graphx.util.GraphXUtils
239
240
// Configure Spark for optimal GraphX performance
241
val conf = new SparkConf()
242
.setAppName("GraphX Application")
243
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
244
245
// Register GraphX classes for efficient serialization
246
GraphXUtils.registerKryoClasses(conf)
247
248
val sc = new SparkContext(conf)
249
250
// Now create and process graphs with optimized serialization
251
val graph = loadGraph()
252
```
253
254
### Graph Generators
255
256
Utility functions for generating synthetic graphs for testing and benchmarking.
257
258
```scala { .api }
259
/**
260
* Graph generation utilities for testing and benchmarking
261
*/
262
object GraphGenerators {
263
/**
264
* Generate log-normal degree distribution graph
265
* @param sc SparkContext
266
* @param numVertices Number of vertices to generate
267
* @param numEParts Number of edge partitions
268
* @param mu Mean of underlying normal distribution
269
* @param sigma Standard deviation of underlying normal distribution
270
* @param seed Random seed (-1 for random seed)
271
* @return Generated graph with Long vertex attributes and Int edge attributes
272
*/
273
def logNormalGraph(
274
sc: SparkContext,
275
numVertices: Int,
276
numEParts: Int = 0,
277
mu: Double = 4.0,
278
sigma: Double = 1.3,
279
seed: Long = -1
280
): Graph[Long, Int]
281
282
/**
283
* Generate R-MAT graph (Recursive Matrix)
284
* @param sc SparkContext
285
* @param requestedNumVertices Approximate number of vertices
286
* @param numEParts Number of edge partitions
287
* @return Generated R-MAT graph
288
*/
289
def rmatGraph(
290
sc: SparkContext,
291
requestedNumVertices: Int,
292
numEParts: Int
293
): Graph[Int, Int]
294
295
/**
296
* Generate star graph (one central vertex connected to all others)
297
* @param sc SparkContext
298
* @param nverts Total number of vertices
299
* @return Star graph
300
*/
301
def starGraph(sc: SparkContext, nverts: Int): Graph[Int, Int]
302
303
/**
304
* Generate grid graph (2D grid topology)
305
* @param sc SparkContext
306
* @param rows Number of rows in grid
307
* @param cols Number of columns in grid
308
* @return Grid graph with coordinate vertex attributes
309
*/
310
def gridGraph(sc: SparkContext, rows: Int, cols: Int): Graph[(Int, Int), Double]
311
}
312
```
313
314
**Usage Examples:**
315
316
```scala
317
import org.apache.spark.graphx.util.GraphGenerators
318
319
// Generate synthetic graphs for testing algorithms
320
val logNormalGraph = GraphGenerators.logNormalGraph(sc, 1000, numEParts = 4)
321
println(s"Log-normal graph: ${logNormalGraph.numVertices} vertices, ${logNormalGraph.numEdges} edges")
322
323
// Generate R-MAT graph (commonly used benchmark)
324
val rmatGraph = GraphGenerators.rmatGraph(sc, 1000, 4)
325
println(s"R-MAT graph: ${rmatGraph.numVertices} vertices, ${rmatGraph.numEdges} edges")
326
327
// Generate star graph for testing centrality algorithms
328
val starGraph = GraphGenerators.starGraph(sc, 100)
329
val centerVertex = starGraph.degrees.max()(Ordering.by(_._2))
330
println(s"Star graph center vertex: ${centerVertex._1} with degree ${centerVertex._2}")
331
332
// Generate grid graph for spatial algorithms
333
val gridGraph = GraphGenerators.gridGraph(sc, 10, 10)
334
gridGraph.vertices.take(5).foreach { case (id, (row, col)) =>
335
println(s"Vertex $id at position ($row, $col)")
336
}
337
338
// Use synthetic graphs for algorithm benchmarking
339
def benchmarkAlgorithm(): Unit = {
340
val testGraphs = Seq(
341
("LogNormal", GraphGenerators.logNormalGraph(sc, 1000)),
342
("RMAT", GraphGenerators.rmatGraph(sc, 1000, 4)),
343
("Star", GraphGenerators.starGraph(sc, 1000)),
344
("Grid", GraphGenerators.gridGraph(sc, 32, 32))
345
)
346
347
testGraphs.foreach { case (name, graph) =>
348
val startTime = System.currentTimeMillis()
349
val result = PageRank.run(graph, 10)
350
val endTime = System.currentTimeMillis()
351
352
println(s"$name graph PageRank: ${endTime - startTime}ms")
353
}
354
}
355
```
356
357
### Configuration and Performance Tuning
358
359
Best practices for configuring GraphX applications.
360
361
```scala
362
// Optimal Spark configuration for GraphX
363
val conf = new SparkConf()
364
.setAppName("GraphX Application")
365
// Serialization
366
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
367
// Memory management
368
.set("spark.executor.memory", "8g")
369
.set("spark.executor.cores", "4")
370
// Network
371
.set("spark.sql.adaptive.enabled", "true")
372
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
373
// GraphX specific
374
.set("spark.graphx.pregel.checkpointInterval", "10")
375
376
GraphXUtils.registerKryoClasses(conf)
377
378
// Graph-specific optimizations
379
def optimizeGraph[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VD, ED] = {
380
graph
381
.partitionBy(PartitionStrategy.EdgePartition2D) // Good general partitioning
382
.cache() // Cache for iterative algorithms
383
}
384
385
// Storage level selection based on graph size and memory
386
def selectStorageLevel(numVertices: Long, numEdges: Long): StorageLevel = {
387
if (numVertices > 10000000 || numEdges > 50000000) {
388
StorageLevel.MEMORY_AND_DISK_SER // Large graphs - use disk spillover
389
} else if (numVertices > 1000000) {
390
StorageLevel.MEMORY_ONLY_SER // Medium graphs - serialize for space
391
} else {
392
StorageLevel.MEMORY_ONLY // Small graphs - keep unserized
393
}
394
}
395
396
// Dynamic partitioning based on graph characteristics
397
def selectPartitionStrategy[VD, ED](graph: Graph[VD, ED]): PartitionStrategy = {
398
val maxDegree = graph.degrees.map(_._2).max()
399
val avgDegree = graph.degrees.map(_._2).mean()
400
401
if (maxDegree > avgDegree * 100) {
402
PartitionStrategy.EdgePartition2D // High degree variance
403
} else if (avgDegree > 50) {
404
PartitionStrategy.RandomVertexCut // High average degree
405
} else {
406
PartitionStrategy.EdgePartition1D // Low degree graphs
407
}
408
}
409
```
410
411
### Data Import/Export Utilities
412
413
Utilities for saving and loading graphs in various formats.
414
415
```scala
416
// Save graph to files
417
def saveGraph[VD: ClassTag, ED: ClassTag](
418
graph: Graph[VD, ED],
419
vertexPath: String,
420
edgePath: String
421
): Unit = {
422
// Save vertices as (id, attribute) pairs
423
graph.vertices.map { case (id, attr) => s"$id\t$attr" }
424
.saveAsTextFile(vertexPath)
425
426
// Save edges as (src, dst, attr) triples
427
graph.edges.map { edge => s"${edge.srcId}\t${edge.dstId}\t${edge.attr}" }
428
.saveAsTextFile(edgePath)
429
}
430
431
// Load graph from saved files
432
def loadGraph(
433
sc: SparkContext,
434
vertexPath: String,
435
edgePath: String
436
): Graph[String, String] = {
437
val vertices = sc.textFile(vertexPath).map { line =>
438
val parts = line.split("\t")
439
(parts(0).toLong, parts(1))
440
}
441
442
val edges = sc.textFile(edgePath).map { line =>
443
val parts = line.split("\t")
444
Edge(parts(0).toLong, parts(1).toLong, parts(2))
445
}
446
447
Graph(vertices, edges)
448
}
449
450
// Export to standard graph formats
451
def exportToGraphML[VD, ED](graph: Graph[VD, ED], path: String): Unit = {
452
// Generate GraphML XML format
453
val vertices = graph.vertices.map { case (id, attr) =>
454
s"""<node id="$id"><data key="attr">$attr</data></node>"""
455
}.collect()
456
457
val edges = graph.edges.map { edge =>
458
s"""<edge source="${edge.srcId}" target="${edge.dstId}"><data key="attr">${edge.attr}</data></edge>"""
459
}.collect()
460
461
val graphML = s"""<?xml version="1.0" encoding="UTF-8"?>
462
<graphml xmlns="http://graphml.graphdrawing.org/xmlns">
463
<key id="attr" for="node" attr.name="attribute" attr.type="string"/>
464
<key id="attr" for="edge" attr.name="attribute" attr.type="string"/>
465
<graph edgedefault="directed">
466
${vertices.mkString("\n ")}
467
${edges.mkString("\n ")}
468
</graph>
469
</graphml>"""
470
471
// Save to file (pseudo-code - actual implementation depends on file system)
472
// writeToFile(path, graphML)
473
}
474
```
475
476
## Performance Considerations
477
478
### Loading Optimization
479
480
- **File format**: Use compressed formats (gzip, snappy) for large edge lists
481
- **Partitioning**: Specify appropriate number of partitions for large files
482
- **Storage levels**: Choose memory vs. disk based on graph size
483
- **Canonical orientation**: Use for undirected graphs to reduce edge count
484
485
### Partitioning Guidelines
486
487
- **EdgePartition2D**: Best for graphs with high-degree vertices
488
- **EdgePartition1D**: Good for algorithms that iterate over out-edges
489
- **RandomVertexCut**: General purpose, good for most iterative algorithms
490
- **CanonicalRandomVertexCut**: Use for undirected graph algorithms
491
492
### Memory Management
493
494
```scala
495
// Monitor memory usage and adjust accordingly
496
def monitorGraphMemory[VD, ED](graph: Graph[VD, ED]): Unit = {
497
val vertexMemory = graph.vertices.cache().count()
498
val edgeMemory = graph.edges.cache().count()
499
500
println(s"Cached vertices: $vertexMemory")
501
println(s"Cached edges: $edgeMemory")
502
503
// Unpersist when no longer needed
504
graph.vertices.unpersist()
505
graph.edges.unpersist()
506
}
507
```