0
# Graph Analytics and Metrics
1
2
Gelly's Analytics framework (ASM - Analytics, Statistics, Metrics) provides comprehensive tools for computing graph statistics, degree distributions, and structural metrics. The framework includes both graph-level analytics and general DataSet analytics with efficient accumulator-based result collection.
3
4
## Capabilities
5
6
### Analytics Execution Interface
7
8
Analytics implement either `GraphAnalytic` for graph-specific analysis or `DataSetAnalytic` for general dataset analysis.
9
10
```java { .api }
11
public interface GraphAnalytic<K, VV, EV, T> {
12
T getResult();
13
T execute() throws Exception;
14
T execute(String executionName) throws Exception;
15
GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input) throws Exception;
16
}
17
18
public interface DataSetAnalytic<T, R> {
19
R getResult();
20
R execute() throws Exception;
21
R execute(String executionName) throws Exception;
22
DataSetAnalytic<T, R> run(DataSet<T> input);
23
}
24
25
// Execute analytics on graph
26
public <T> T run(GraphAnalytic<K, VV, EV, T> analytic) throws Exception
27
```
28
29
### Basic Graph Statistics
30
31
Core graph statistics available as direct methods on the Graph class.
32
33
```java { .api }
34
// Vertex and edge counts
35
public long numberOfVertices() throws Exception
36
public long numberOfEdges() throws Exception
37
38
// Degree computations
39
public DataSet<Tuple2<K, LongValue>> inDegrees()
40
public DataSet<Tuple2<K, LongValue>> outDegrees()
41
public DataSet<Tuple2<K, LongValue>> getDegrees()
42
```
43
44
**Usage Example:**
45
46
```java
47
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
48
Graph<Long, String, Double> graph = /* ... */;
49
50
// Basic statistics
51
long vertexCount = graph.numberOfVertices();
52
long edgeCount = graph.numberOfEdges();
53
54
System.out.println("Graph has " + vertexCount + " vertices and " + edgeCount + " edges");
55
56
// Degree analysis
57
DataSet<Tuple2<Long, LongValue>> degrees = graph.getDegrees();
58
degrees.print(); // Print all vertex degrees
59
60
// Find vertices with high degree
61
DataSet<Tuple2<Long, LongValue>> highDegree = degrees
62
.filter(degree -> degree.f1.getValue() > 5);
63
```
64
65
### DataSet Analytics
66
67
General-purpose analytics for any DataSet, including graph components.
68
69
#### Count Analytics
70
71
Count elements in a DataSet with optional execution naming.
72
73
```java { .api }
74
public class Count<T> extends AbstractDataSetAnalytic<T, LongValue> {
75
public Count()
76
}
77
```
78
79
**Usage Example:**
80
81
```java
82
// Count vertices
83
Count<Vertex<Long, String>> vertexCount = new Count<>();
84
LongValue count = graph.getVertices().run(vertexCount).execute();
85
86
// Count edges
87
Count<Edge<Long, Double>> edgeCount = new Count<>();
88
LongValue edgeCountResult = graph.getEdges().run(edgeCount).execute();
89
```
90
91
#### Collect Analytics
92
93
Collect DataSet elements to the driver program.
94
95
```java { .api }
96
public class Collect<T> extends AbstractDataSetAnalytic<T, List<T>> {
97
public Collect()
98
}
99
```
100
101
**Usage Example:**
102
103
```java
104
// Collect small datasets
105
Collect<Vertex<Long, String>> collector = new Collect<>();
106
List<Vertex<Long, String>> vertices = graph.getVertices().run(collector).execute();
107
108
for (Vertex<Long, String> vertex : vertices) {
109
System.out.println("Vertex: " + vertex.getId() + " = " + vertex.getValue());
110
}
111
```
112
113
#### Checksum Analytics
114
115
Compute checksums for data validation and testing.
116
117
```java { .api }
118
public class ChecksumHashCode<T> extends AbstractDataSetAnalytic<T, Tuple3<IntValue, LongValue, LongValue>> {
119
public ChecksumHashCode()
120
}
121
```
122
123
**Usage Example:**
124
125
```java
126
// Compute checksum of vertices
127
ChecksumHashCode<Vertex<Long, String>> checksum = new ChecksumHashCode<>();
128
Tuple3<IntValue, LongValue, LongValue> result = graph.getVertices().run(checksum).execute();
129
130
// Result contains: (hashCode, count, checksum)
131
System.out.println("Hash: " + result.f0.getValue());
132
System.out.println("Count: " + result.f1.getValue());
133
System.out.println("Checksum: " + result.f2.getValue());
134
```
135
136
### Degree-Based Analytics
137
138
Comprehensive degree analysis for both directed and undirected graphs.
139
140
#### Directed Graph Degree Analytics
141
142
Analytics for directed graphs with separate in-degree and out-degree analysis.
143
144
```java { .api }
145
// Vertex degree annotation
146
public class VertexDegrees<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Vertex<K, Tuple3<K, LongValue, LongValue>>>>
147
148
public class VertexInDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Vertex<K, Tuple2<K, LongValue>>>>
149
150
public class VertexOutDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Vertex<K, Tuple2<K, LongValue>>>>
151
152
// Edge degree annotation
153
public class EdgeDegreesPair<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple4<K, LongValue, LongValue, EV>>>>
154
155
public class EdgeSourceDegrees<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple3<K, LongValue, LongValue, EV>>>>
156
157
public class EdgeTargetDegrees<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple3<K, LongValue, LongValue, EV>>>>
158
```
159
160
**Usage Example:**
161
162
```java
163
// Annotate vertices with in-degree and out-degree
164
VertexDegrees<Long, String, Double> vertexDegrees = new VertexDegrees<>();
165
DataSet<Vertex<Long, Tuple3<Long, LongValue, LongValue>>> degreeResult = graph.run(vertexDegrees);
166
167
// Result format: Vertex<ID, (originalID, inDegree, outDegree)>
168
degreeResult.print();
169
170
// Annotate edges with source and target degrees
171
EdgeDegreesPair<Long, String, Double> edgeDegrees = new EdgeDegreesPair<>();
172
DataSet<Edge<Long, Tuple4<Long, LongValue, LongValue, Double>>> edgeResult = graph.run(edgeDegrees);
173
174
// Result format: Edge<source, target, (targetID, sourceInDegree, sourceOutDegree, originalEdgeValue)>
175
edgeResult.print();
176
```
177
178
#### Undirected Graph Degree Analytics
179
180
Analytics for undirected graphs with single degree values.
181
182
```java { .api }
183
// Vertex degree annotation
184
public class VertexDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Vertex<K, Tuple2<K, LongValue>>>>
185
186
// Edge degree annotation
187
public class EdgeDegreePair<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple3<K, LongValue, LongValue>>>>
188
189
public class EdgeSourceDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple2<K, LongValue>>>>
190
191
public class EdgeTargetDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple2<K, LongValue>>>>
192
```
193
194
**Usage Example:**
195
196
```java
197
// For undirected graph analysis
198
Graph<Long, String, Double> undirectedGraph = graph.getUndirected();
199
200
// Annotate vertices with degree
201
VertexDegree<Long, String, Double> vertexDegree = new VertexDegree<>();
202
DataSet<Vertex<Long, Tuple2<Long, LongValue>>> result = undirectedGraph.run(vertexDegree);
203
204
// Result format: Vertex<ID, (originalID, degree)>
205
result.print();
206
207
// Find high-degree vertices
208
DataSet<Vertex<Long, Tuple2<Long, LongValue>>> highDegreeVertices = result
209
.filter(vertex -> vertex.getValue().f1.getValue() > 10);
210
```
211
212
### Degree Filtering
213
214
Filter graphs based on degree constraints.
215
216
```java { .api }
217
// Filter by maximum degree (undirected graphs)
218
public class MaximumDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, Graph<K, VV, EV>> {
219
public MaximumDegree(long maximumDegree)
220
}
221
```
222
223
**Usage Example:**
224
225
```java
226
// Filter graph to only include vertices with degree <= 5
227
MaximumDegree<Long, String, Double> filter = new MaximumDegree<>(5L);
228
Graph<Long, String, Double> filteredGraph = graph.run(filter);
229
230
System.out.println("Original vertices: " + graph.numberOfVertices());
231
System.out.println("Filtered vertices: " + filteredGraph.numberOfVertices());
232
```
233
234
### Graph Metrics
235
236
Comprehensive metrics for graph structure analysis.
237
238
#### Directed Graph Metrics
239
240
```java { .api }
241
public class EdgeMetrics<K, VV, EV> implements GraphAnalytic<K, VV, EV, EdgeMetrics.Result> {
242
public EdgeMetrics()
243
}
244
245
public class VertexMetrics<K, VV, EV> implements GraphAnalytic<K, VV, EV, VertexMetrics.Result> {
246
public VertexMetrics()
247
}
248
```
249
250
#### Undirected Graph Metrics
251
252
```java { .api }
253
public class EdgeMetrics<K, VV, EV> implements GraphAnalytic<K, VV, EV, EdgeMetrics.Result> {
254
public EdgeMetrics()
255
}
256
257
public class VertexMetrics<K, VV, EV> implements GraphAnalytic<K, VV, EV, VertexMetrics.Result> {
258
public VertexMetrics()
259
}
260
```
261
262
**Usage Example:**
263
264
```java
265
// Compute comprehensive vertex metrics
266
VertexMetrics<Long, String, Double> vertexMetrics =
267
new org.apache.flink.graph.library.metric.undirected.VertexMetrics<>();
268
VertexMetrics.Result metrics = graph.run(vertexMetrics);
269
270
// Access various metrics
271
System.out.println("Vertex count: " + metrics.getNumberOfVertices());
272
System.out.println("Edge count: " + metrics.getNumberOfEdges());
273
System.out.println("Average degree: " + metrics.getAverageDegree());
274
System.out.println("Density: " + metrics.getDensity());
275
System.out.println("Triplet count: " + metrics.getNumberOfTriplets());
276
277
// Compute edge metrics
278
EdgeMetrics<Long, String, Double> edgeMetrics =
279
new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<>();
280
EdgeMetrics.Result edgeResult = graph.run(edgeMetrics);
281
```
282
283
### Result Types
284
285
Common result types used by analytics.
286
287
```java { .api }
288
// Unary result (single value)
289
public class UnaryResult<T> implements PrintableResult {
290
public T getValue()
291
}
292
293
// Binary result (two values)
294
public class BinaryResult<T0, T1> implements PrintableResult {
295
public T0 getValue0()
296
public T1 getValue1()
297
}
298
299
// Tertiary result (three values)
300
public class TertiaryResult<T0, T1, T2> implements PrintableResult {
301
public T0 getValue0()
302
public T1 getValue1()
303
public T2 getValue2()
304
}
305
306
// Printable result interface
307
public interface PrintableResult {
308
String toPrintableString()
309
}
310
```
311
312
### Custom Analytics Development
313
314
#### Creating Graph Analytics
315
316
```java
317
public class CustomGraphAnalytic<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, MyResult> {
318
319
@Override
320
public MyResult execute() throws Exception {
321
// Implementation using graph data
322
DataSet<Vertex<K, VV>> vertices = input.getVertices();
323
DataSet<Edge<K, EV>> edges = input.getEdges();
324
325
// Compute custom analysis
326
MyResult result = computeCustomMetric(vertices, edges);
327
return result;
328
}
329
330
private MyResult computeCustomMetric(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges) {
331
// Custom computation logic
332
return new MyResult();
333
}
334
}
335
```
336
337
#### Creating DataSet Analytics
338
339
```java
340
public class CustomDataSetAnalytic<T> extends AbstractDataSetAnalytic<T, MyResult> {
341
342
@Override
343
public MyResult execute() throws Exception {
344
// Process the input DataSet
345
return input.aggregate(/* custom aggregation */).collect().get(0);
346
}
347
}
348
```
349
350
### Performance Optimization
351
352
#### Memory Management
353
354
```java
355
// Configure memory for analytics
356
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
357
env.getConfig().setTaskManagerMemory(2048); // 2GB
358
359
// Use efficient data types
360
Graph<LongValue, NullValue, NullValue> efficientGraph = /* ... */;
361
```
362
363
#### Parallel Execution
364
365
```java
366
// Set parallelism for analytics
367
env.setParallelism(4);
368
369
// Analytics will use the configured parallelism
370
VertexMetrics<Long, String, Double> metrics = new VertexMetrics<>();
371
metrics.setParallelism(8); // Override for specific analytic
372
```
373
374
#### Accumulator-Based Results
375
376
Analytics use Flink's accumulator mechanism for efficient result collection:
377
378
```java
379
// Results are automatically collected using accumulators
380
// No explicit collect() calls needed for analytics
381
MyResult result = graph.run(myAnalytic).execute();
382
```