0
# Graph Analytics and Metrics
1
2
Graph analytics operations for computing structural properties, degrees, neighborhoods, and graph metrics through the GraphOps implicit class.
3
4
## Capabilities
5
6
### Graph Metrics
7
8
Basic structural metrics and properties of the graph.
9
10
```scala { .api }
11
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) {
12
/** Total number of edges in the graph */
13
lazy val numEdges: Long
14
15
/** Total number of vertices in the graph */
16
lazy val numVertices: Long
17
18
/** In-degree of each vertex (edges pointing to vertex) */
19
lazy val inDegrees: VertexRDD[Int]
20
21
/** Out-degree of each vertex (edges originating from vertex) */
22
lazy val outDegrees: VertexRDD[Int]
23
24
/** Total degree of each vertex (in-degree + out-degree) */
25
lazy val degrees: VertexRDD[Int]
26
}
27
```
28
29
**Usage Examples:**
30
31
```scala
32
import org.apache.spark.graphx._
33
34
// Basic graph metrics
35
println(s"Graph has ${graph.numVertices} vertices")
36
println(s"Graph has ${graph.numEdges} edges")
37
38
// Vertex degrees
39
val inDegreeStats = graph.inDegrees.map(_._2).stats()
40
val outDegreeStats = graph.outDegrees.map(_._2).stats()
41
42
println(s"Average in-degree: ${inDegreeStats.mean}")
43
println(s"Max out-degree: ${outDegreeStats.max}")
44
45
// Find vertices with highest degree
46
val highestDegreeVertex = graph.degrees.reduce((a, b) => if (a._2 > b._2) a else b)
47
println(s"Highest degree vertex: ${highestDegreeVertex._1}, degree: ${highestDegreeVertex._2}")
48
```
49
50
### Neighborhood Collection
51
52
Operations to collect neighboring vertices and edges for each vertex.
53
54
```scala { .api }
55
/**
56
* Collect vertex IDs of neighboring vertices
57
* @param edgeDirection Direction of edges to follow (In/Out/Either/Both)
58
*/
59
def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]]
60
61
/**
62
* Collect neighboring vertices with their attributes
63
* @param edgeDirection Direction of edges to follow
64
*/
65
def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexId, VD)]]
66
67
/**
68
* Collect incident edges for each vertex
69
* @param edgeDirection Direction of edges to collect
70
*/
71
def collectEdges(edgeDirection: EdgeDirection): VertexRDD[Array[Edge[ED]]]
72
```
73
74
**Usage Examples:**
75
76
```scala
77
// Collect outgoing neighbor IDs for each vertex
78
val outNeighbors = graph.collectNeighborIds(EdgeDirection.Out)
79
outNeighbors.collect().foreach { case (vertexId, neighbors) =>
80
println(s"Vertex $vertexId has outgoing neighbors: ${neighbors.mkString(", ")}")
81
}
82
83
// Collect incoming neighbors with attributes
84
val inNeighborsWithData = graph.collectNeighbors(EdgeDirection.In)
85
inNeighborsWithData.collect().foreach { case (vertexId, neighbors) =>
86
val neighborNames = neighbors.map(_._2).mkString(", ")
87
println(s"Vertex $vertexId has incoming neighbors: $neighborNames")
88
}
89
90
// Collect incident edges for each vertex
91
val outgoingEdges = graph.collectEdges(EdgeDirection.Out)
92
outgoingEdges.collect().foreach { case (vertexId, edges) =>
93
println(s"Vertex $vertexId has ${edges.length} outgoing edges")
94
}
95
96
// Collect all incident edges
97
val allEdges = graph.collectEdges(EdgeDirection.Either)
98
allEdges.collect().foreach { case (vertexId, edges) =>
99
println(s"Vertex $vertexId has ${edges.length} incident edges")
100
}
101
```
102
103
### Graph Cleaning Operations
104
105
Operations to clean and normalize graph structure.
106
107
```scala { .api }
108
/**
109
* Remove self-edges (edges where source equals destination)
110
*/
111
def removeSelfEdges(): Graph[VD, ED]
112
113
/**
114
* Convert to canonical edge format removing duplicate edges
115
* @param mergeFunc Function to merge duplicate edge attributes
116
*/
117
def convertToCanonicalEdges(mergeFunc: (ED, ED) => ED = (e1, e2) => e1): Graph[VD, ED]
118
```
119
120
**Usage Examples:**
121
122
```scala
123
// Remove self-loops from graph
124
val noSelfLoopsGraph = graph.removeSelfEdges()
125
126
// Convert to canonical form, merging parallel edges
127
val canonicalGraph = graph.convertToCanonicalEdges((a, b) => s"$a;$b")
128
```
129
130
### Vertex Operations
131
132
Operations for joining and transforming vertex data.
133
134
```scala { .api }
135
/**
136
* Join vertices with external data, only keeping matches
137
* @param table External RDD to join with
138
* @param mapFunc Function to combine original and joined data
139
*/
140
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(
141
mapFunc: (VertexId, VD, U) => VD
142
): Graph[VD, ED]
143
144
/**
145
* Pick a random vertex ID from the graph
146
*/
147
def pickRandomVertex(): VertexId
148
```
149
150
**Usage Examples:**
151
152
```scala
153
// Join with external user data
154
val userProfiles: RDD[(VertexId, String)] = sc.parallelize(Array(
155
(1L, "Engineer"), (2L, "Designer"), (3L, "Manager")
156
))
157
158
val enrichedGraph = graph.joinVertices(userProfiles) { (id, name, profession) =>
159
s"$name - $profession"
160
}
161
162
// Pick random vertex for sampling
163
val randomVertex = graph.pickRandomVertex()
164
println(s"Random vertex selected: $randomVertex")
165
```
166
167
### Advanced Analytics Operations
168
169
Higher-level analytics operations and filtering.
170
171
```scala { .api }
172
/**
173
* Apply preprocessing to graph and filter result
174
* Used for complex graph transformations and analytics pipelines
175
*/
176
def filter[VD2: ClassTag, ED2: ClassTag](
177
preprocess: Graph[VD, ED] => Graph[VD2, ED2]
178
): Graph[VD, ED]
179
```
180
181
### Algorithm Integration Methods
182
183
Direct access to graph algorithms through GraphOps.
184
185
```scala { .api }
186
/**
187
* Run PageRank until convergence
188
* @param tol Convergence tolerance
189
* @param resetProb Random reset probability (default 0.15)
190
*/
191
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
192
193
/**
194
* Personalized PageRank from specific source vertex
195
* @param src Source vertex for personalization
196
* @param tol Convergence tolerance
197
* @param resetProb Random reset probability
198
*/
199
def personalizedPageRank(src: VertexId, tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
200
201
/**
202
* Static PageRank with fixed iterations
203
* @param numIter Number of iterations
204
* @param resetProb Random reset probability
205
*/
206
def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double]
207
208
/**
209
* Connected components algorithm
210
*/
211
def connectedComponents(): Graph[VertexId, ED]
212
213
/**
214
* Triangle counting algorithm
215
*/
216
def triangleCount(): Graph[Int, ED]
217
218
/**
219
* Strongly connected components with iteration limit
220
* @param numIter Maximum number of iterations
221
*/
222
def stronglyConnectedComponents(numIter: Int): Graph[VertexId, ED]
223
```
224
225
**Usage Examples:**
226
227
```scala
228
// Run PageRank analysis
229
val pageRankResults = graph.pageRank(0.0001)
230
val topVertices = pageRankResults.vertices.top(5)(Ordering.by(_._2))
231
println("Top 5 vertices by PageRank:")
232
topVertices.foreach { case (id, rank) => println(s"Vertex $id: $rank") }
233
234
// Find connected components
235
val components = graph.connectedComponents()
236
val componentSizes = components.vertices.map(_._2).countByValue()
237
println(s"Found ${componentSizes.size} connected components")
238
239
// Count triangles
240
val triangleCounts = graph.triangleCount()
241
val totalTriangles = triangleCounts.vertices.map(_._2).sum() / 3
242
println(s"Total triangles in graph: $totalTriangles")
243
244
// Personalized PageRank from specific vertex
245
val personalizedPR = graph.personalizedPageRank(1L, 0.001)
246
val personalizedScores = personalizedPR.vertices.collect()
247
personalizedScores.foreach { case (id, score) =>
248
println(s"Personalized PageRank from vertex 1 to vertex $id: $score")
249
}
250
```
251
252
### Pregel Integration
253
254
Direct access to Pregel computation framework.
255
256
```scala { .api }
257
/**
258
* Run Pregel computation using vertex-centric programming model
259
* @param initialMsg Initial message sent to all vertices
260
* @param maxIterations Maximum number of iterations
261
* @param activeDirection Direction of active edges
262
* @param vprog Vertex program - how vertices update their state
263
* @param sendMsg Edge program - what messages to send along edges
264
* @param mergeMsg Message merge function
265
*/
266
def pregel[A: ClassTag](
267
initialMsg: A,
268
maxIterations: Int = Int.MaxValue,
269
activeDirection: EdgeDirection = EdgeDirection.Either
270
)(
271
vprog: (VertexId, VD, A) => VD,
272
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
273
mergeMsg: (A, A) => A
274
): Graph[VD, ED]
275
```
276
277
**Usage Examples:**
278
279
```scala
280
// Implement single-source shortest paths using Pregel
281
val sourceId: VertexId = 1L
282
283
// Initialize distances: 0 for source, infinity for others
284
val initialGraph = graph.mapVertices((id, _) =>
285
if (id == sourceId) 0.0 else Double.PositiveInfinity
286
)
287
288
val shortestPaths = initialGraph.pregel(Double.PositiveInfinity)(
289
// Vertex program: update distance if better path found
290
(id, dist, newDist) => math.min(dist, newDist),
291
292
// Send message: send distance + edge weight to neighbors
293
triplet => {
294
if (triplet.srcAttr + 1.0 < triplet.dstAttr) {
295
Iterator((triplet.dstId, triplet.srcAttr + 1.0))
296
} else {
297
Iterator.empty
298
}
299
},
300
301
// Merge messages: take minimum distance
302
(a, b) => math.min(a, b)
303
)
304
```
305
306
## Types
307
308
### EdgeDirection
309
310
Enumeration for specifying edge directions in neighborhood operations.
311
312
```scala { .api }
313
sealed abstract class EdgeDirection {
314
def reverse: EdgeDirection
315
}
316
317
object EdgeDirection {
318
/** Edges arriving at a vertex (incoming edges) */
319
case object In extends EdgeDirection
320
321
/** Edges originating from a vertex (outgoing edges) */
322
case object Out extends EdgeDirection
323
324
/** Edges in either direction (incoming OR outgoing) */
325
case object Either extends EdgeDirection
326
327
/** Edges in both directions (incoming AND outgoing) */
328
case object Both extends EdgeDirection
329
}
330
```