0
# Graph Analytics
1
2
Built-in graph metrics, degree calculations, neighborhood operations, and custom reduction functions for comprehensive graph analysis and statistical computation.
3
4
## Capabilities
5
6
### Degree Calculations
7
8
Compute vertex degrees for analyzing graph connectivity patterns.
9
10
```scala { .api }
11
/**
12
* Return the in-degree of all vertices in the graph
13
* @return A DataSet of Tuple2<vertexId, inDegree>
14
*/
15
def inDegrees(): DataSet[(K, LongValue)]
16
17
/**
18
* Return the out-degree of all vertices in the graph
19
* @return A DataSet of Tuple2<vertexId, outDegree>
20
*/
21
def outDegrees(): DataSet[(K, LongValue)]
22
23
/**
24
* Return the degree of all vertices in the graph
25
* @return A DataSet of Tuple2<vertexId, degree>
26
*/
27
def getDegrees(): DataSet[(K, LongValue)]
28
```
29
30
### Neighborhood Reduction Operations
31
32
Perform reduction operations over vertex neighborhoods for custom analytics.
33
34
```scala { .api }
35
/**
36
* Compute a reduce transformation over the neighbors' vertex values of each vertex.
37
* For each vertex, the transformation consecutively calls a
38
* ReduceNeighborsFunction until only a single value for each vertex remains.
39
* The ReduceNeighborsFunction combines a pair of neighbor vertex values
40
* into one new value of the same type.
41
* @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex.
42
* @param direction the edge direction (in-, out-, all-)
43
* @return a Dataset of Tuple2, with one tuple per vertex.
44
* The first field of the Tuple2 is the vertex ID and the second field
45
* is the aggregate value computed by the provided ReduceNeighborsFunction.
46
*/
47
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV],
48
direction: EdgeDirection): DataSet[(K, VV)]
49
50
/**
51
* Compute a reduce transformation over the neighbors' vertex values of each vertex.
52
* For each vertex, the transformation consecutively calls a
53
* ReduceNeighborsFunction until only a single value for each vertex remains.
54
* The ReduceNeighborsFunction combines a pair of neighbor vertex values
55
* into one new value of the same type.
56
* @param reduceEdgesFunction the reduce function to apply to the edges of each vertex.
57
* @param direction the edge direction (in-, out-, all-)
58
* @return a Dataset of Tuple2, with one tuple per vertex.
59
* The first field of the Tuple2 is the vertex ID and the second field
60
* is the aggregate value computed by the provided ReduceNeighborsFunction.
61
*/
62
def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV],
63
direction: EdgeDirection): DataSet[(K, EV)]
64
```
65
66
### Group Reduction Operations
67
68
Perform more complex aggregations over neighborhoods using user-defined functions.
69
70
```scala { .api }
71
/**
72
* Compute an aggregate over the edges of each vertex. The function applied
73
* on the edges has access to the vertex value.
74
* @param edgesFunction the function to apply to the neighborhood
75
* @param direction the edge direction (in-, out-, all-)
76
* @tparam T the output type
77
* @return a dataset of a T
78
*/
79
def groupReduceOnEdges[T](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T],
80
direction: EdgeDirection): DataSet[T]
81
82
/**
83
* Compute an aggregate over the edges of each vertex. The function applied
84
* on the edges has access to the vertex value.
85
* @param edgesFunction the function to apply to the neighborhood
86
* @param direction the edge direction (in-, out-, all-)
87
* @tparam T the output type
88
* @return a dataset of a T
89
*/
90
def groupReduceOnEdges[T](edgesFunction: EdgesFunction[K, EV, T],
91
direction: EdgeDirection): DataSet[T]
92
93
/**
94
* Compute an aggregate over the neighbors (edges and vertices) of each
95
* vertex. The function applied on the neighbors has access to the vertex
96
* value.
97
* @param neighborsFunction the function to apply to the neighborhood
98
* @param direction the edge direction (in-, out-, all-)
99
* @tparam T the output type
100
* @return a dataset of a T
101
*/
102
def groupReduceOnNeighbors[T](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T],
103
direction: EdgeDirection): DataSet[T]
104
105
/**
106
* Compute an aggregate over the neighbors (edges and vertices) of each
107
* vertex.
108
* @param neighborsFunction the function to apply to the neighborhood
109
* @param direction the edge direction (in-, out-, all-)
110
* @tparam T the output type
111
* @return a dataset of a T
112
*/
113
def groupReduceOnNeighbors[T](neighborsFunction: NeighborsFunction[K, VV, EV, T],
114
direction: EdgeDirection): DataSet[T]
115
```
116
117
## Function Interfaces
118
119
### Reduction Function Types
120
121
Key function interfaces for implementing custom neighborhood reductions:
122
123
```scala { .api }
124
// From Java Gelly - Reduce functions for simple aggregations
125
trait ReduceNeighborsFunction[VV] {
126
def reduceNeighbors(firstNeighborValue: VV, secondNeighborValue: VV): VV
127
}
128
129
trait ReduceEdgesFunction[EV] {
130
def reduceEdges(firstEdgeValue: EV, secondEdgeValue: EV): EV
131
}
132
```
133
134
### Edge Direction Enumeration
135
136
Control which edges to consider in neighborhood operations:
137
138
```scala { .api }
139
// From Java Gelly
140
object EdgeDirection extends Enumeration {
141
val IN: EdgeDirection // Consider only incoming edges
142
val OUT: EdgeDirection // Consider only outgoing edges
143
val ALL: EdgeDirection // Consider both incoming and outgoing edges
144
}
145
```
146
147
**Usage Examples:**
148
149
```scala
150
import org.apache.flink.graph.scala._
151
import org.apache.flink.graph.{Edge, Vertex, EdgeDirection}
152
import org.apache.flink.api.scala._
153
import org.apache.flink.types.LongValue
154
155
val env = ExecutionEnvironment.getExecutionEnvironment
156
157
// Create sample graph with numeric vertex values
158
val vertices = env.fromCollection(Seq(
159
new Vertex(1L, 10.0),
160
new Vertex(2L, 20.0),
161
new Vertex(3L, 30.0),
162
new Vertex(4L, 40.0)
163
))
164
165
val edges = env.fromCollection(Seq(
166
new Edge(1L, 2L, 1.5),
167
new Edge(2L, 3L, 2.5),
168
new Edge(3L, 4L, 3.5),
169
new Edge(1L, 4L, 4.5)
170
))
171
172
val graph = Graph.fromDataSet(vertices, edges, env)
173
174
// Basic degree calculations
175
val inDegrees = graph.inDegrees() // DataSet[(Long, LongValue)]
176
val outDegrees = graph.outDegrees() // DataSet[(Long, LongValue)]
177
val allDegrees = graph.getDegrees() // DataSet[(Long, LongValue)]
178
179
// Simple reductions on neighbors
180
val maxNeighborValue = graph.reduceOnNeighbors(
181
new ReduceNeighborsFunction[Double] {
182
override def reduceNeighbors(first: Double, second: Double): Double = {
183
math.max(first, second)
184
}
185
},
186
EdgeDirection.ALL
187
)
188
189
val sumEdgeWeights = graph.reduceOnEdges(
190
new ReduceEdgesFunction[Double] {
191
override def reduceEdges(first: Double, second: Double): Double = {
192
first + second
193
}
194
},
195
EdgeDirection.OUT
196
)
197
```
198
199
### Advanced Analytics Examples
200
201
```scala
202
// Custom edge analysis function
203
class EdgeStatistics extends EdgesFunction[Long, Double, (Long, Int, Double, Double)] {
204
override def iterateEdges(edges: Iterable[(Long, Edge[Long, Double])],
205
out: Collector[(Long, Int, Double, Double)]): Unit = {
206
val edgeList = edges.toList
207
if (edgeList.nonEmpty) {
208
val vertexId = edgeList.head._1
209
val edgeCount = edgeList.size
210
val weights = edgeList.map(_._2.getValue)
211
val minWeight = weights.min
212
val maxWeight = weights.max
213
214
out.collect((vertexId, edgeCount, minWeight, maxWeight))
215
}
216
}
217
}
218
219
// Apply custom edge statistics
220
val edgeStats = graph.groupReduceOnEdges(new EdgeStatistics(), EdgeDirection.OUT)
221
222
// Custom neighbor analysis with vertex access
223
class NeighborAnalysis extends NeighborsFunctionWithVertexValue[Long, Double, Double, (Long, Double, Double)] {
224
override def iterateNeighbors(vertex: Vertex[Long, Double],
225
neighbors: Iterable[(Edge[Long, Double], Vertex[Long, Double])],
226
out: Collector[(Long, Double, Double)]): Unit = {
227
val neighborList = neighbors.toList
228
if (neighborList.nonEmpty) {
229
val avgNeighborValue = neighborList.map(_._2.getValue).sum / neighborList.size
230
val avgEdgeWeight = neighborList.map(_._1.getValue).sum / neighborList.size
231
232
out.collect((vertex.getId, avgNeighborValue, avgEdgeWeight))
233
}
234
}
235
}
236
237
// Apply neighbor analysis
238
val neighborStats = graph.groupReduceOnNeighbors(new NeighborAnalysis(), EdgeDirection.ALL)
239
```
240
241
### Analytical Patterns
242
243
#### Local Graph Properties
244
Calculate properties for individual vertices based on their neighborhoods:
245
246
```scala
247
// Vertex clustering coefficient
248
class ClusteringCoefficient extends NeighborsFunction[Long, Double, Double, (Long, Double)] {
249
override def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Double], Vertex[Long, Double])],
250
out: Collector[(Long, Double)]): Unit = {
251
val neighborList = neighbors.toList
252
if (neighborList.size >= 2) {
253
val vertexId = neighborList.head._1
254
val neighborIds = neighborList.map(_._3.getId).toSet
255
256
// Count edges between neighbors (simplified - would need actual graph access)
257
val possibleEdges = neighborIds.size * (neighborIds.size - 1) / 2
258
val clustering = if (possibleEdges > 0) 0.0 else 0.0 // Placeholder logic
259
260
out.collect((vertexId, clustering))
261
}
262
}
263
}
264
```
265
266
#### Aggregated Statistics
267
Compute graph-wide statistics by combining local measurements:
268
269
```scala
270
// Combine degree calculations with other metrics
271
val degreeStats = graph.getDegrees().collect()
272
val avgDegree = degreeStats.map(_._2.getValue).sum / degreeStats.length.toDouble
273
val maxDegree = degreeStats.map(_._2.getValue).max
274
val minDegree = degreeStats.map(_._2.getValue).min
275
```
276
277
### Performance Considerations
278
279
- **Direction Selection**: Choose appropriate EdgeDirection (IN, OUT, ALL) to minimize computation
280
- **Function Complexity**: Keep reduction functions simple for better performance
281
- **Memory Usage**: Be aware of memory usage when collecting neighborhood information
282
- **Parallelization**: Group reduction operations are automatically parallelized across the cluster
283
- **Caching**: Consider caching frequently accessed neighborhood computations
284
285
The analytics capabilities provide both built-in metrics and flexible frameworks for custom graph analysis, all executed efficiently within Flink's distributed environment.