0
# Data Access and Utilities
1
2
Gelly provides comprehensive methods for accessing graph data, computing neighborhoods, performing joins, and integrating with Flink's DataSet API. This includes graph validation, format conversion, and utility functions for common graph operations.
3
4
## Capabilities
5
6
### Basic Data Access
7
8
Access the fundamental graph components as Flink DataSets.
9
10
```java { .api }
11
// Core data access
12
public DataSet<Vertex<K, VV>> getVertices()
13
public DataSet<Edge<K, EV>> getEdges()
14
public DataSet<Triplet<K, VV, EV>> getTriplets()
15
16
// ID extraction
17
public DataSet<K> getVertexIds()
18
public DataSet<Tuple2<K, K>> getEdgeIds()
19
```
20
21
**Usage Example:**
22
23
```java
24
Graph<Long, String, Double> graph = /* ... */;
25
26
// Access vertices and edges
27
DataSet<Vertex<Long, String>> vertices = graph.getVertices();
28
DataSet<Edge<Long, Double>> edges = graph.getEdges();
29
30
// Process vertices
31
DataSet<String> vertexValues = vertices.map(vertex -> vertex.getValue());
32
vertexValues.print();
33
34
// Process edges
35
DataSet<Double> edgeWeights = edges.map(edge -> edge.getValue());
36
System.out.println("Average weight: " + edgeWeights.reduce((a, b) -> (a + b) / 2));
37
38
// Get triplets (edges with vertex values)
39
DataSet<Triplet<Long, String, Double>> triplets = graph.getTriplets();
40
triplets.print(); // Shows: (srcId, trgId, srcValue, trgValue, edgeValue)
41
```
42
43
### Neighborhood Operations
44
45
Perform computations on vertex neighborhoods with various aggregation patterns.
46
47
#### Edge-Based Neighborhood Operations
48
49
Operate on edges incident to each vertex.
50
51
```java { .api }
52
public <T> DataSet<T> groupReduceOnEdges(
53
EdgesFunction<K, EV, T> edgesFunction,
54
EdgeDirection direction)
55
56
public <T> DataSet<T> groupReduceOnEdges(
57
EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
58
EdgeDirection direction)
59
60
public DataSet<EV> reduceOnEdges(
61
ReduceEdgesFunction<EV> reduceEdgesFunction,
62
EdgeDirection direction)
63
```
64
65
#### Neighbor-Based Operations
66
67
Operate on neighboring vertices.
68
69
```java { .api }
70
public <T> DataSet<T> groupReduceOnNeighbors(
71
NeighborsFunction<K, VV, EV, T> neighborsFunction,
72
EdgeDirection direction)
73
74
public <T> DataSet<T> groupReduceOnNeighbors(
75
NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
76
EdgeDirection direction)
77
78
public DataSet<VV> reduceOnNeighbors(
79
ReduceNeighborsFunction<VV> reduceNeighborsFunction,
80
EdgeDirection direction)
81
```
82
83
**Usage Example:**
84
85
```java
86
// Count outgoing edges for each vertex
87
DataSet<Tuple2<Long, Integer>> outDegrees = graph.groupReduceOnEdges(
88
new EdgesFunction<Long, Double, Tuple2<Long, Integer>>() {
89
@Override
90
public void iterateEdges(
91
Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
92
Collector<Tuple2<Long, Integer>> out) {
93
94
int count = 0;
95
Long vertexId = null;
96
97
for (Tuple2<Long, Edge<Long, Double>> edge : edges) {
98
vertexId = edge.f0; // Vertex ID
99
count++;
100
}
101
102
out.collect(new Tuple2<>(vertexId, count));
103
}
104
},
105
EdgeDirection.OUT
106
);
107
108
// Sum neighbor values
109
DataSet<Tuple2<Long, String>> neighborSums = graph.groupReduceOnNeighbors(
110
new NeighborsFunction<Long, String, Double, Tuple2<Long, String>>() {
111
@Override
112
public void iterateNeighbors(
113
Iterable<Tuple3<Long, Vertex<Long, String>, Edge<Long, Double>>> neighbors,
114
Collector<Tuple2<Long, String>> out) {
115
116
StringBuilder sum = new StringBuilder();
117
Long vertexId = null;
118
119
for (Tuple3<Long, Vertex<Long, String>, Edge<Long, Double>> neighbor : neighbors) {
120
vertexId = neighbor.f0;
121
sum.append(neighbor.f1.getValue()).append(" ");
122
}
123
124
out.collect(new Tuple2<>(vertexId, sum.toString().trim()));
125
}
126
},
127
EdgeDirection.ALL
128
);
129
```
130
131
### Function Interfaces
132
133
Core interfaces for neighborhood operations.
134
135
#### EdgesFunction<K, EV, O>
136
137
Process edges incident to each vertex.
138
139
```java { .api }
140
public interface EdgesFunction<K, EV, O> extends Function, Serializable {
141
void iterateEdges(
142
Iterable<Tuple2<K, Edge<K, EV>>> edges,
143
Collector<O> out) throws Exception;
144
}
145
```
146
147
#### EdgesFunctionWithVertexValue<K, VV, EV, T>
148
149
Process edges with access to the vertex value.
150
151
```java { .api }
152
public interface EdgesFunctionWithVertexValue<K, VV, EV, T> extends Function, Serializable {
153
void iterateEdges(
154
Vertex<K, VV> vertex,
155
Iterable<Edge<K, EV>> edges,
156
Collector<T> out) throws Exception;
157
}
158
```
159
160
#### NeighborsFunction<K, VV, EV, T>
161
162
Process neighboring vertices and connecting edges.
163
164
```java { .api }
165
public interface NeighborsFunction<K, VV, EV, T> extends Function, Serializable {
166
void iterateNeighbors(
167
Iterable<Tuple3<K, Vertex<K, VV>, Edge<K, EV>>> neighbors,
168
Collector<T> out) throws Exception;
169
}
170
```
171
172
#### NeighborsFunctionWithVertexValue<K, VV, EV, T>
173
174
Process neighbors with access to the source vertex value.
175
176
```java { .api }
177
public interface NeighborsFunctionWithVertexValue<K, VV, EV, T> extends Function, Serializable {
178
void iterateNeighbors(
179
Vertex<K, VV> vertex,
180
Iterable<Tuple2<Vertex<K, VV>, Edge<K, EV>>> neighbors,
181
Collector<T> out) throws Exception;
182
}
183
```
184
185
#### Reduce Functions
186
187
Simple reduction operations on edges and neighbors.
188
189
```java { .api }
190
public interface ReduceEdgesFunction<EV> extends Function, Serializable {
191
EV reduceEdges(EV firstEdgeValue, EV secondEdgeValue) throws Exception;
192
}
193
194
public interface ReduceNeighborsFunction<VV> extends Function, Serializable {
195
VV reduceNeighbors(VV firstNeighborValue, VV secondNeighborValue) throws Exception;
196
}
197
```
198
199
**Usage Example:**
200
201
```java
202
// Reduce edge weights to find maximum outgoing edge weight per vertex
203
DataSet<Tuple2<Long, Double>> maxWeights = graph.reduceOnEdges(
204
new ReduceEdgesFunction<Double>() {
205
@Override
206
public Double reduceEdges(Double first, Double second) {
207
return Math.max(first, second);
208
}
209
},
210
EdgeDirection.OUT
211
);
212
213
// Reduce neighbor values (concatenate strings)
214
DataSet<Tuple2<Long, String>> concatenatedNeighbors = graph.reduceOnNeighbors(
215
new ReduceNeighborsFunction<String>() {
216
@Override
217
public String reduceNeighbors(String first, String second) {
218
return first + "_" + second;
219
}
220
},
221
EdgeDirection.ALL
222
);
223
```
224
225
### Graph Joins
226
227
Join graph components with external DataSets.
228
229
#### Vertex Joins
230
231
Join vertices with external data.
232
233
```java { .api }
234
public <T> Graph<K, VV, EV> joinWithVertices(
235
DataSet<Tuple2<K, T>> inputDataSet,
236
VertexJoinFunction<VV, T> vertexJoinFunction)
237
238
public <T> Graph<K, T, EV> joinWithVertices(
239
DataSet<Tuple2<K, T>> inputDataSet,
240
VertexJoinFunction<VV, T> vertexJoinFunction,
241
boolean keepUnmatchedVertices)
242
```
243
244
#### Edge Joins
245
246
Join edges with external data.
247
248
```java { .api }
249
public <T> Graph<K, VV, EV> joinWithEdges(
250
DataSet<Tuple3<K, K, T>> inputDataSet,
251
EdgeJoinFunction<EV, T> edgeJoinFunction)
252
253
public <T> Graph<K, VV, T> joinWithEdges(
254
DataSet<Tuple3<K, K, T>> inputDataSet,
255
EdgeJoinFunction<EV, T> edgeJoinFunction,
256
boolean keepUnmatchedEdges)
257
```
258
259
#### Join Function Interfaces
260
261
```java { .api }
262
public interface VertexJoinFunction<VV, T> extends Function, Serializable {
263
VV vertexJoin(VV vertexValue, T inputValue) throws Exception;
264
}
265
266
public interface EdgeJoinFunction<EV, T> extends Function, Serializable {
267
EV edgeJoin(EV edgeValue, T inputValue) throws Exception;
268
}
269
```
270
271
**Usage Example:**
272
273
```java
274
// External vertex data
275
DataSet<Tuple2<Long, Integer>> vertexAges = env.fromElements(
276
new Tuple2<>(1L, 25),
277
new Tuple2<>(2L, 30),
278
new Tuple2<>(3L, 35)
279
);
280
281
// Join with vertices to add age information
282
Graph<Long, String, Double> enrichedGraph = graph.joinWithVertices(
283
vertexAges,
284
new VertexJoinFunction<String, Integer>() {
285
@Override
286
public String vertexJoin(String name, Integer age) {
287
return name + "_age" + age;
288
}
289
}
290
);
291
292
// External edge data
293
DataSet<Tuple3<Long, Long, String>> edgeLabels = env.fromElements(
294
new Tuple3<>(1L, 2L, "friendship"),
295
new Tuple3<>(2L, 3L, "colleague")
296
);
297
298
// Join with edges to add labels
299
Graph<Long, String, String> labeledGraph = enrichedGraph.joinWithEdges(
300
edgeLabels,
301
new EdgeJoinFunction<Double, String>() {
302
@Override
303
public String edgeJoin(Double weight, String label) {
304
return label + "_" + weight;
305
}
306
}
307
);
308
```
309
310
### Graph Validation
311
312
Validate graph structure and properties.
313
314
```java { .api }
315
public Graph<K, VV, EV> validate(GraphValidator<K, VV, EV> validator) throws Exception
316
317
public interface GraphValidator<K, VV, EV> extends Serializable {
318
boolean validate(Graph<K, VV, EV> graph) throws Exception;
319
}
320
```
321
322
**Usage Example:**
323
324
```java
325
// Custom validator to check for self-loops
326
GraphValidator<Long, String, Double> noSelfLoopValidator =
327
new GraphValidator<Long, String, Double>() {
328
@Override
329
public boolean validate(Graph<Long, String, Double> graph) throws Exception {
330
DataSet<Edge<Long, Double>> selfLoops = graph.getEdges()
331
.filter(edge -> edge.getSource().equals(edge.getTarget()));
332
333
long selfLoopCount = selfLoops.count();
334
return selfLoopCount == 0;
335
}
336
};
337
338
// Validate graph
339
Graph<Long, String, Double> validatedGraph = graph.validate(noSelfLoopValidator);
340
```
341
342
### Utility Classes
343
344
Conversion utilities for different data formats.
345
346
#### Tuple Conversion Utilities
347
348
```java { .api }
349
// Convert between tuples and graph types
350
public class Tuple2ToVertexMap<K, VV> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>>
351
public class Tuple3ToEdgeMap<K, EV> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>>
352
public class VertexToTuple2Map<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>>
353
public class EdgeToTuple3Map<K, EV> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>>
354
```
355
356
**Usage Example:**
357
358
```java
359
// Convert graph to tuple format for export
360
DataSet<Tuple2<Long, String>> vertexTuples = graph.getVertices()
361
.map(new VertexToTuple2Map<Long, String>());
362
363
DataSet<Tuple3<Long, Long, Double>> edgeTuples = graph.getEdges()
364
.map(new EdgeToTuple3Map<Long, Double>());
365
366
// Export to CSV
367
vertexTuples.writeAsCsv("vertices.csv");
368
edgeTuples.writeAsCsv("edges.csv");
369
370
// Import from tuples
371
DataSet<Vertex<Long, String>> importedVertices = vertexTuples
372
.map(new Tuple2ToVertexMap<Long, String>());
373
374
DataSet<Edge<Long, Double>> importedEdges = edgeTuples
375
.map(new Tuple3ToEdgeMap<Long, Double>());
376
377
Graph<Long, String, Double> importedGraph = Graph.fromDataSet(
378
importedVertices, importedEdges, env);
379
```
380
381
### Integration with Flink DataSet API
382
383
Seamless integration with Flink's DataSet operations.
384
385
**Usage Example:**
386
387
```java
388
// Use Flink operations on graph components
389
DataSet<Vertex<Long, String>> vertices = graph.getVertices();
390
391
// Standard DataSet operations
392
DataSet<Vertex<Long, String>> filteredVertices = vertices
393
.filter(vertex -> vertex.getValue().length() > 3)
394
.map(vertex -> new Vertex<>(vertex.getId(), vertex.getValue().toUpperCase()));
395
396
// Group operations
397
DataSet<Tuple2<String, Long>> vertexGroups = vertices
398
.map(vertex -> new Tuple2<>(vertex.getValue().substring(0, 1), 1L))
399
.groupBy(0)
400
.sum(1);
401
402
// Join with other DataSets
403
DataSet<Tuple2<Long, Integer>> externalData = /* ... */;
404
DataSet<Tuple3<Long, String, Integer>> joined = vertices
405
.join(externalData)
406
.where("f0").equalTo("f0")
407
.with((vertex, data) -> new Tuple3<>(vertex.getId(), vertex.getValue(), data.f1));
408
409
// Create new graph from processed data
410
Graph<Long, String, Double> processedGraph = Graph.fromDataSet(
411
filteredVertices, graph.getEdges(), env);
412
```
413
414
### Performance Optimization
415
416
#### Caching Graph Components
417
418
```java
419
// Cache frequently accessed components
420
DataSet<Vertex<Long, String>> vertices = graph.getVertices().cache();
421
DataSet<Edge<Long, Double>> edges = graph.getEdges().cache();
422
423
// Reuse cached data
424
DataSet<String> values1 = vertices.map(v -> v.getValue());
425
DataSet<String> values2 = vertices.map(v -> v.getValue().toUpperCase());
426
```
427
428
#### Efficient Data Types
429
430
```java
431
// Use value types for better performance
432
Graph<LongValue, StringValue, DoubleValue> efficientGraph = /* ... */;
433
434
// Avoid boxed primitives for large-scale operations
435
Graph<Long, String, Double> lessEfficientGraph = /* ... */;
436
```
437
438
#### Parallel Processing
439
440
```java
441
// Configure parallelism for data access operations
442
env.setParallelism(8);
443
444
// Operations will use configured parallelism
445
DataSet<Vertex<Long, String>> vertices = graph.getVertices(); // Uses parallelism 8
446
```