0
# Graph Analytics
1
2
Complete API for graph analytics, metrics calculations, and structural operations.
3
4
## Degree Calculations
5
6
### Basic Degree Metrics
7
8
```scala { .api }
9
def inDegrees(): DataSet[(K, LongValue)]
10
```
11
12
Returns the in-degree of all vertices in the graph as a DataSet of `(vertexId, inDegree)` tuples.
13
14
```scala { .api }
15
def outDegrees(): DataSet[(K, LongValue)]
16
```
17
18
Returns the out-degree of all vertices in the graph as a DataSet of `(vertexId, outDegree)` tuples.
19
20
```scala { .api }
21
def getDegrees(): DataSet[(K, LongValue)]
22
```
23
24
Returns the total degree (in-degree + out-degree) of all vertices as a DataSet of `(vertexId, degree)` tuples.
25
26
## Structural Transformations
27
28
### Graph Structure Modifications
29
30
```scala { .api }
31
def getUndirected(): Graph[K, VV, EV]
32
```
33
34
Creates an undirected version of the graph by adding all inverse-direction edges. Each edge `(u,v)` results in both `(u,v)` and `(v,u)` edges.
35
36
```scala { .api }
37
def reverse(): Graph[K, VV, EV]
38
```
39
40
Reverses the direction of all edges in the graph. Edge `(u,v)` becomes `(v,u)`.
41
42
### Validation
43
44
```scala { .api }
45
def validate(validator: GraphValidator[K, VV, EV]): Boolean
46
```
47
48
Validates the graph using the provided validator function.
49
50
**Parameters:**
51
- `validator` - GraphValidator that defines validation rules
52
53
## Set Operations
54
55
### Graph Union
56
57
```scala { .api }
58
def union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
59
```
60
61
Performs union on the vertices and edges sets of the input graphs. Removes duplicate vertices but maintains duplicate edges.
62
63
**Parameters:**
64
- `graph` - The graph to perform union with
65
66
### Graph Difference
67
68
```scala { .api }
69
def difference(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
70
```
71
72
Performs difference on the vertex and edge sets. Removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed.
73
74
**Parameters:**
75
- `graph` - The graph to perform difference with
76
77
### Graph Intersection
78
79
```scala { .api }
80
def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV]
81
```
82
83
Performs intersection on the edge sets of the input graphs. Edges are considered equal if they have the same source identifier, target identifier, and edge value.
84
85
**Parameters:**
86
- `graph` - The graph to perform intersection with
87
- `distinctEdges` - If `true`, exactly one edge represents all pairs of equal edges; if `false`, both edges of each pair are included
88
89
## Neighborhood Operations
90
91
### Edge-based Aggregations
92
93
```scala { .api }
94
def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T], direction: EdgeDirection): DataSet[T]
95
```
96
97
Computes an aggregate over the edges of each vertex without access to the vertex value.
98
99
**Parameters:**
100
- `edgesFunction` - Function to apply to the edges of each vertex
101
- `direction` - Edge direction (IN, OUT, ALL)
102
103
```scala { .api }
104
def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]
105
```
106
107
Computes an aggregate over the edges of each vertex with access to the vertex value.
108
109
**Parameters:**
110
- `edgesFunction` - Function that has access to both vertex value and edges
111
- `direction` - Edge direction (IN, OUT, ALL)
112
113
### Neighbor-based Aggregations
114
115
```scala { .api }
116
def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunction[K, VV, EV, T], direction: EdgeDirection): DataSet[T]
117
```
118
119
Computes an aggregate over the neighbors (edges and vertices) of each vertex.
120
121
**Parameters:**
122
- `neighborsFunction` - Function to apply to the neighborhood
123
- `direction` - Edge direction (IN, OUT, ALL)
124
125
```scala { .api }
126
def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]
127
```
128
129
Computes an aggregate over the neighbors with access to the source vertex value.
130
131
**Parameters:**
132
- `neighborsFunction` - Function that has access to source vertex and neighbors
133
- `direction` - Edge direction (IN, OUT, ALL)
134
135
## Reduction Operations
136
137
### Neighbor Value Reduction
138
139
```scala { .api }
140
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction: EdgeDirection): DataSet[(K, VV)]
141
```
142
143
Computes a reduce transformation over the neighbors' vertex values of each vertex. The function consecutively combines pairs of neighbor vertex values until only a single value remains.
144
145
**Parameters:**
146
- `reduceNeighborsFunction` - Reduce function to apply to neighbor values
147
- `direction` - Edge direction (IN, OUT, ALL)
148
149
**Returns:** DataSet of `(vertexId, aggregatedValue)` tuples
150
151
### Edge Value Reduction
152
153
```scala { .api }
154
def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): DataSet[(K, EV)]
155
```
156
157
Computes a reduce transformation over the edge values of each vertex. The function consecutively combines pairs of edge values until only a single value remains.
158
159
**Parameters:**
160
- `reduceEdgesFunction` - Reduce function to apply to edge values
161
- `direction` - Edge direction (IN, OUT, ALL)
162
163
**Returns:** DataSet of `(vertexId, aggregatedValue)` tuples
164
165
## Custom Function Types
166
167
### EdgesFunction
168
169
```scala { .api }
170
abstract class EdgesFunction[K, EV, T] {
171
def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T]): Unit
172
}
173
```
174
175
Abstract base class for functions that operate on the edges of a vertex.
176
177
### EdgesFunctionWithVertexValue
178
179
```scala { .api }
180
abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] {
181
def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T]): Unit
182
}
183
```
184
185
Abstract base class for functions that operate on edges with access to the vertex value.
186
187
### NeighborsFunction
188
189
```scala { .api }
190
abstract class NeighborsFunction[K, VV, EV, T] {
191
def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])], out: Collector[T]): Unit
192
}
193
```
194
195
Abstract base class for functions that operate on vertex neighbors (edges and adjacent vertices).
196
197
### NeighborsFunctionWithVertexValue
198
199
```scala { .api }
200
abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] {
201
def iterateNeighbors(vertex: Vertex[K, VV], neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])], out: Collector[T]): Unit
202
}
203
```
204
205
Abstract base class for functions that operate on neighbors with access to the source vertex value.
206
207
## Edge Direction Enum
208
209
```scala { .api }
210
object EdgeDirection extends Enumeration {
211
val IN, OUT, ALL = Value
212
}
213
```
214
215
Enumeration for specifying edge directions in neighborhood operations:
216
- `IN` - Consider only incoming edges
217
- `OUT` - Consider only outgoing edges
218
- `ALL` - Consider both incoming and outgoing edges
219
220
## Usage Examples
221
222
### Degree Analysis
223
224
```scala
225
import org.apache.flink.graph.EdgeDirection
226
227
// Calculate all degree metrics
228
val inDegrees = graph.inDegrees()
229
val outDegrees = graph.outDegrees()
230
val totalDegrees = graph.getDegrees()
231
232
// Find vertices with high out-degree
233
val highOutDegree = outDegrees.filter(_._2.getValue > 10)
234
```
235
236
### Structural Operations
237
238
```scala
239
// Create undirected version
240
val undirectedGraph = graph.getUndirected()
241
242
// Reverse all edges
243
val reversedGraph = graph.reverse()
244
245
// Combine with another graph
246
val combinedGraph = graph.union(otherGraph)
247
248
// Find intersection with another graph
249
val intersection = graph.intersect(otherGraph, distinctEdges = true)
250
```
251
252
### Neighborhood Aggregations
253
254
```scala
255
// Custom edge aggregation function
256
class SumEdgeValues extends EdgesFunction[Long, Double, Double] {
257
override def iterateEdges(edges: Iterable[(Long, Edge[Long, Double])], out: Collector[Double]): Unit = {
258
val sum = edges.map(_._2.getValue).sum
259
out.collect(sum)
260
}
261
}
262
263
// Apply edge aggregation
264
val edgeSums = graph.groupReduceOnEdges(new SumEdgeValues(), EdgeDirection.OUT)
265
266
// Reduce neighbor values
267
val neighborSums = graph.reduceOnNeighbors(
268
new ReduceNeighborsFunction[String] {
269
override def reduceNeighbors(firstNeighborValue: String, secondNeighborValue: String): String = {
270
firstNeighborValue + "," + secondNeighborValue
271
}
272
},
273
EdgeDirection.ALL
274
)
275
```
276
277
### Advanced Analytics
278
279
```scala
280
// Calculate average edge weight per vertex
281
class AverageEdgeWeight extends EdgesFunctionWithVertexValue[Long, String, Double, (Long, Double)] {
282
override def iterateEdges(vertex: Vertex[Long, String], edges: Iterable[Edge[Long, Double]], out: Collector[(Long, Double)]): Unit = {
283
val edgeList = edges.toList
284
if (edgeList.nonEmpty) {
285
val average = edgeList.map(_.getValue).sum / edgeList.size
286
out.collect((vertex.getId, average))
287
}
288
}
289
}
290
291
val avgWeights = graph.groupReduceOnEdges(new AverageEdgeWeight(), EdgeDirection.OUT)
292
```