0
# Example Implementations
1
2
Standalone example implementations demonstrating various graph processing algorithms and programming patterns. These examples showcase different approaches to graph analysis using Apache Flink's Gelly API and serve as reference implementations for common use cases.
3
4
## Capabilities
5
6
### Java Examples
7
8
Collection of standalone Java implementations demonstrating different algorithm approaches and Flink programming patterns.
9
10
#### PageRank Implementation
11
12
Generic PageRank implementation using scatter-gather pattern with configurable parameters.
13
14
```java { .api }
15
/**
16
* PageRank algorithm implementation with generic key type support
17
* Uses scatter-gather pattern for distributed computation
18
* @param <K> Vertex key type
19
*/
20
public class PageRank<K> {
21
/**
22
* Create PageRank instance with specified parameters
23
* @param beta Damping factor (typically 0.85)
24
* @param maxIterations Maximum number of iterations
25
*/
26
public PageRank(double beta, int maxIterations);
27
28
/**
29
* Execute PageRank algorithm on input graph
30
* @param network Input graph with double-valued vertices and edges
31
* @return DataSet of vertices with PageRank scores
32
*/
33
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network);
34
}
35
```
36
37
**Usage Examples:**
38
39
```java
40
import org.apache.flink.graph.examples.PageRank;
41
42
// Create PageRank instance
43
PageRank<Long> pageRank = new PageRank<>(0.85, 10);
44
45
// Prepare graph with initial values
46
Graph<Long, Double, Double> graph = inputGraph
47
.mapVertices(new MapFunction<Vertex<Long, NullValue>, Double>() {
48
public Double map(Vertex<Long, NullValue> vertex) {
49
return 1.0; // Initial PageRank value
50
}
51
});
52
53
// Execute algorithm
54
DataSet<Vertex<Long, Double>> results = pageRank.run(graph);
55
results.print();
56
```
57
58
#### Single Source Shortest Paths
59
60
Standard implementation of single source shortest paths using iterative vertex-centric computation.
61
62
```java { .api }
63
/**
64
* Single Source Shortest Paths algorithm
65
* Computes shortest distances from a source vertex to all reachable vertices
66
*/
67
public class SingleSourceShortestPaths {
68
/**
69
* Execute SSSP from specified source vertex
70
* @param graph Input graph with numeric edge weights
71
* @param srcVertexId Source vertex identifier
72
* @param maxIterations Maximum number of iterations
73
* @return DataSet of vertices with shortest distances
74
*/
75
public static DataSet<Vertex<Long, Double>> run(
76
Graph<Long, NullValue, Double> graph,
77
Long srcVertexId,
78
Integer maxIterations
79
) throws Exception;
80
}
81
```
82
83
**Usage Examples:**
84
85
```java
86
// Execute SSSP from vertex 1
87
DataSet<Vertex<Long, Double>> distances =
88
SingleSourceShortestPaths.run(weightedGraph, 1L, 10);
89
90
// Filter reachable vertices
91
DataSet<Vertex<Long, Double>> reachable = distances
92
.filter(vertex -> vertex.getValue() < Double.POSITIVE_INFINITY);
93
```
94
95
#### Gather-Sum-Apply Examples
96
97
Examples demonstrating the Gather-Sum-Apply (GSA) programming model.
98
99
##### GSA PageRank
100
101
```java { .api }
102
/**
103
* PageRank implementation using Gather-Sum-Apply pattern
104
*/
105
public class GSAPageRank {
106
public static void main(String[] args) throws Exception;
107
108
/** Custom gather function for PageRank computation */
109
public static class GatherRanks implements GatherFunction<Double, Double, Double>;
110
111
/** Custom sum function for aggregating ranks */
112
public static class SumRanks implements SumFunction<Double, Double, Double>;
113
114
/** Custom apply function for updating vertex values */
115
public static class UpdateRanks implements ApplyFunction<Long, Double, Double>;
116
}
117
```
118
119
##### GSA Single Source Shortest Paths
120
121
```java { .api }
122
/**
123
* SSSP implementation using Gather-Sum-Apply pattern
124
*/
125
public class GSASingleSourceShortestPaths {
126
public static void main(String[] args) throws Exception;
127
128
/** Gather minimum distances from neighbors */
129
public static class GatherDistances implements GatherFunction<Double, Double, Double>;
130
131
/** Select minimum distance */
132
public static class SelectMinDistance implements SumFunction<Double, Double, Double>;
133
134
/** Update vertex distance if improvement found */
135
public static class UpdateDistance implements ApplyFunction<Long, Double, Double>;
136
}
137
```
138
139
#### Incremental SSSP
140
141
Advanced implementation demonstrating incremental graph processing with dynamic updates.
142
143
```java { .api }
144
/**
145
* Incremental Single Source Shortest Paths
146
* Efficiently recomputes shortest paths when graph structure changes
147
*/
148
public class IncrementalSSSP {
149
public static void main(String[] args) throws Exception;
150
151
/**
152
* Process incremental updates to shortest path tree
153
* @param graph Base graph with current distances
154
* @param edgeUpdates Stream of edge additions/deletions
155
* @return Updated shortest path tree
156
*/
157
public static DataSet<Vertex<Long, Double>> processUpdates(
158
Graph<Long, Double, Double> graph,
159
DataSet<Edge<Long, Double>> edgeUpdates
160
) throws Exception;
161
}
162
```
163
164
#### Music Profiles
165
166
Real-world example demonstrating graph-based recommendation system using collaborative filtering.
167
168
```java { .api }
169
/**
170
* Music recommendation system using graph processing
171
* Builds user-song bipartite graph and computes recommendations
172
*/
173
public class MusicProfiles {
174
public static void main(String[] args) throws Exception;
175
176
/**
177
* Generate music recommendations based on user listening history
178
* @param userSongGraph Bipartite graph of users and songs
179
* @param targetUser User to generate recommendations for
180
* @return Ranked list of song recommendations
181
*/
182
public static DataSet<Tuple2<String, Double>> generateRecommendations(
183
Graph<String, NullValue, Double> userSongGraph,
184
String targetUser
185
) throws Exception;
186
}
187
```
188
189
#### Euclidean Graph Weighing
190
191
Example demonstrating spatial graph processing with geometric distance calculations.
192
193
```java { .api }
194
/**
195
* Weight graph edges by Euclidean distance between vertex coordinates
196
* Useful for spatial networks and geographic data processing
197
*/
198
public class EuclideanGraphWeighing {
199
public static void main(String[] args) throws Exception;
200
201
/**
202
* Compute Euclidean distances for graph edges
203
* @param graph Graph with vertex coordinates
204
* @return Graph with distance-weighted edges
205
*/
206
public static Graph<Long, Point, Double> computeDistances(
207
Graph<Long, Point, NullValue> graph
208
) throws Exception;
209
}
210
```
211
212
#### Pregel SSSP
213
214
Implementation showcasing the Pregel programming model for vertex-centric computation.
215
216
```java { .api }
217
/**
218
* Single Source Shortest Paths using Pregel model
219
* Demonstrates vertex-centric programming approach
220
*/
221
public class PregelSSSP {
222
public static void main(String[] args) throws Exception;
223
224
/** Pregel compute function for SSSP */
225
public static class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double>;
226
227
/** Message combiner for efficiency */
228
public static class MinMessageCombiner implements MessageCombiner<Long, Double>;
229
}
230
```
231
232
### Scala Examples
233
234
Scala implementations demonstrating functional programming approaches to graph processing.
235
236
#### Scala Connected Components
237
238
```scala { .api }
239
/**
240
* Connected Components implementation in Scala
241
* Demonstrates functional programming style with Flink Scala API
242
*/
243
object ConnectedComponents {
244
def main(args: Array[String]): Unit
245
246
/**
247
* Find connected components using label propagation
248
* @param graph Input graph
249
* @param maxIterations Maximum iterations
250
* @return Components as vertex-component pairs
251
*/
252
def findComponents(
253
graph: Graph[Long, Long, NullValue],
254
maxIterations: Int
255
): DataSet[(Long, Long)]
256
}
257
```
258
259
#### Scala GSA SSSP
260
261
```scala { .api }
262
/**
263
* GSA Single Source Shortest Paths in Scala
264
* Functional implementation of gather-sum-apply pattern
265
*/
266
object GSASingleSourceShortestPaths {
267
def main(args: Array[String]): Unit
268
269
/**
270
* Compute shortest paths using functional GSA approach
271
* @param graph Weighted graph
272
* @param srcVertex Source vertex
273
* @return Shortest distances from source
274
*/
275
def computeShortestPaths(
276
graph: Graph[Long, Double, Double],
277
srcVertex: Long
278
): DataSet[(Long, Double)]
279
}
280
```
281
282
#### Scala SSSP
283
284
```scala { .api }
285
/**
286
* Standard SSSP implementation in Scala
287
* Demonstrates idiomatic Scala programming with Flink
288
*/
289
object SingleSourceShortestPaths {
290
def main(args: Array[String]): Unit
291
292
// Functional vertex update operations
293
def updateDistance(vertex: Vertex[Long, Double], messages: Iterator[Double]): Double
294
295
// Distance initialization
296
def initializeDistances(srcVertex: Long): MapFunction[Vertex[Long, NullValue], Double]
297
}
298
```
299
300
## Data Classes and Test Utilities
301
302
### Test Data Generators
303
304
Collection of utility classes providing default datasets for testing and benchmarking examples.
305
306
```java { .api }
307
/**
308
* Default datasets for PageRank testing
309
*/
310
public class PageRankData {
311
/** Get default vertex dataset */
312
public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment env);
313
314
/** Get default edge dataset */
315
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
316
317
/** Expected PageRank results for validation */
318
public static String getExpectedResult();
319
}
320
321
/**
322
* Test data for connected components algorithms
323
*/
324
public class ConnectedComponentsDefaultData {
325
public static DataSet<Vertex<Long, Long>> getDefaultVertexDataSet(ExecutionEnvironment env);
326
public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env);
327
public static String getExpectedResult();
328
}
329
330
/**
331
* Spatial graph data for Euclidean distance examples
332
*/
333
public class EuclideanGraphData {
334
/** Vertices with 2D coordinates */
335
public static DataSet<Vertex<Long, Point>> getDefaultVertexDataSet(ExecutionEnvironment env);
336
337
/** Edges for spatial connectivity */
338
public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env);
339
}
340
341
/**
342
* Music recommendation test data
343
*/
344
public class MusicProfilesData {
345
/** User-song interaction data */
346
public static DataSet<Tuple3<String, String, Double>> getUserSongRatings(ExecutionEnvironment env);
347
348
/** Expected recommendation results */
349
public static String getExpectedResult();
350
}
351
```
352
353
## Usage Patterns
354
355
### Standalone Example Execution
356
357
```java
358
// Direct execution of example algorithms
359
public static void main(String[] args) throws Exception {
360
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
361
362
// Create test graph
363
Graph<Long, NullValue, Double> graph = Graph.fromDataSet(
364
vertices, edges, env
365
);
366
367
// Run algorithm
368
DataSet<Vertex<Long, Double>> result =
369
SingleSourceShortestPaths.run(graph, 1L, 10);
370
371
// Print results
372
result.print();
373
}
374
```
375
376
### Integration with Framework
377
378
```java
379
// Use examples as reference for custom drivers
380
public class CustomPageRankDriver extends DriverBase<Long, NullValue, Double> {
381
private PageRank<Long> pageRankImpl;
382
383
@Override
384
public DataSet plan(Graph<Long, NullValue, Double> graph) throws Exception {
385
// Initialize PageRank with parameters
386
pageRankImpl = new PageRank<>(
387
dampingFactor.getValue(),
388
iterations.getValue()
389
);
390
391
// Convert graph format and execute
392
Graph<Long, Double, Double> initializedGraph =
393
graph.mapVertices(vertex -> 1.0);
394
395
return pageRankImpl.run(initializedGraph);
396
}
397
}
398
```
399
400
### Testing and Validation
401
402
```java
403
// Use data classes for consistent testing
404
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
405
406
// Get standard test data
407
DataSet<Vertex<Long, Double>> vertices = PageRankData.getDefaultVertexDataSet(env);
408
DataSet<Edge<Long, Double>> edges = PageRankData.getDefaultEdgeDataSet(env);
409
Graph<Long, Double, Double> testGraph = Graph.fromDataSet(vertices, edges, env);
410
411
// Run algorithm
412
PageRank<Long> algorithm = new PageRank<>(0.85, 10);
413
DataSet<Vertex<Long, Double>> results = algorithm.run(testGraph);
414
415
// Validate against expected results
416
String expected = PageRankData.getExpectedResult();
417
String actual = DataSetUtils.collect(results).toString();
418
assertEquals(expected, actual);
419
```