0
# Legacy Examples
1
2
The legacy examples package contains standalone implementations of graph algorithms demonstrating different Flink Gelly programming models. These examples serve as educational material and reference implementations, showing scatter-gather, gather-sum-apply (GSA), and vertex-centric (Pregel) approaches to graph processing.
3
4
## Capabilities
5
6
### Standalone Example Programs
7
8
#### Single Source Shortest Paths (Scatter-Gather)
9
10
Computes shortest paths from a single source vertex using the scatter-gather programming model.
11
12
```java { .api }
13
/**
14
* Single Source Shortest Paths using scatter-gather iteration
15
* Demonstrates message passing between vertices for distance computation
16
*/
17
public class SingleSourceShortestPaths implements ProgramDescription {
18
/**
19
* Main entry point for command-line execution
20
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
21
* @throws Exception if execution fails
22
*/
23
public static void main(String[] args) throws Exception;
24
25
/**
26
* Program description for help and documentation
27
* @return Description of the SSSP algorithm and usage
28
*/
29
public String getDescription();
30
}
31
```
32
33
**Command-line Usage:**
34
```bash
35
flink run flink-gelly-examples_2.10-1.3.3.jar \
36
org.apache.flink.graph.examples.SingleSourceShortestPaths \
37
<source_vertex_id> <input_edges_path> <output_path> <num_iterations>
38
```
39
40
**Usage Example:**
41
```bash
42
flink run flink-gelly-examples_2.10-1.3.3.jar \
43
org.apache.flink.graph.examples.SingleSourceShortestPaths \
44
1 edges.csv shortest_paths.csv 10
45
```
46
47
#### Single Source Shortest Paths (GSA)
48
49
Computes shortest paths using the gather-sum-apply programming model.
50
51
```java { .api }
52
/**
53
* Single Source Shortest Paths using gather-sum-apply iteration
54
* Demonstrates GSA programming model with gather, sum, and apply phases
55
*/
56
public class GSASingleSourceShortestPaths implements ProgramDescription {
57
/**
58
* Main entry point for command-line execution
59
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
60
* @throws Exception if execution fails
61
*/
62
public static void main(String[] args) throws Exception;
63
64
/**
65
* Program description for help and documentation
66
* @return Description of the GSA SSSP algorithm and usage
67
*/
68
public String getDescription();
69
}
70
```
71
72
#### Single Source Shortest Paths (Pregel)
73
74
Computes shortest paths using the vertex-centric (Pregel) programming model.
75
76
```java { .api }
77
/**
78
* Single Source Shortest Paths using vertex-centric (Pregel) computation
79
* Demonstrates Pregel programming model with compute functions and message combiners
80
*/
81
public class PregelSSSP implements ProgramDescription {
82
/**
83
* Main entry point for command-line execution
84
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
85
* @throws Exception if execution fails
86
*/
87
public static void main(String[] args) throws Exception;
88
89
/**
90
* Program description for help and documentation
91
* @return Description of the Pregel SSSP algorithm and usage
92
*/
93
public String getDescription();
94
}
95
```
96
97
**Inner Classes:**
98
```java { .api }
99
/**
100
* Vertex compute function for Pregel SSSP implementation
101
* Processes incoming messages and updates vertex state
102
*/
103
public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double> {
104
public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages) throws Exception;
105
}
106
107
/**
108
* Message combiner for Pregel SSSP to reduce message overhead
109
* Combines multiple messages to the same vertex
110
*/
111
public static final class SSSPCombiner extends MessageCombiner<Long, Double> {
112
public void combineMessages(MessageIterator<Double> messages) throws Exception;
113
}
114
```
115
116
#### Music Profiles Analysis
117
118
Complex example demonstrating mixed DataSet and Gelly API usage for user-song bipartite graph analysis.
119
120
```java { .api }
121
/**
122
* Music Profiles analysis using bipartite graphs and community detection
123
* Demonstrates integration of DataSet operations with graph processing
124
*/
125
public class MusicProfiles implements ProgramDescription {
126
/**
127
* Main entry point for command-line execution
128
* @param args Complex parameter list for music analysis pipeline
129
* @throws Exception if execution fails
130
*/
131
public static void main(String[] args) throws Exception;
132
133
/**
134
* Program description for help and documentation
135
* @return Description of the music profiles analysis pipeline
136
*/
137
public String getDescription();
138
}
139
```
140
141
**Command-line Parameters:**
142
```bash
143
<input_user_song_triplets_path> <input_song_mismatches_path> <output_top_tracks_path>
144
<playcount_threshold> <output_communities_path> <num_iterations>
145
```
146
147
**Public Transformation Classes:**
148
```java { .api }
149
/**
150
* Extract mismatch song IDs from mismatch data
151
* Maps mismatch records to song ID strings
152
*/
153
public static final class ExtractMismatchSongIds implements MapFunction<String, String> {
154
public String map(String value) throws Exception;
155
}
156
157
/**
158
* Filter out songs that appear in the mismatch list
159
* Removes problematic songs from the analysis dataset
160
*/
161
public static final class FilterOutMismatches implements FilterFunction<Tuple3<String, String, Integer>> {
162
public boolean filter(Tuple3<String, String, Integer> value) throws Exception;
163
}
164
165
/**
166
* Filter song vertices from user-song bipartite graph
167
* Identifies vertices representing songs vs users
168
*/
169
public static final class FilterSongNodes implements FilterFunction<Vertex<String, Double>> {
170
public boolean filter(Vertex<String, Double> vertex) throws Exception;
171
}
172
173
/**
174
* Get top song per user based on play counts
175
* Finds the most played song for each user
176
*/
177
public static final class GetTopSongPerUser implements GroupReduceFunction<Tuple3<String, String, Integer>, Tuple2<String, String>> {
178
public void reduce(Iterable<Tuple3<String, String, Integer>> values, Collector<Tuple2<String, String>> out) throws Exception;
179
}
180
181
/**
182
* Create edges between users who share similar music preferences
183
* Builds user similarity graph based on common top songs
184
*/
185
public static final class CreateSimilarUserEdges implements GroupReduceFunction<Tuple2<String, String>, Edge<String, NullValue>> {
186
public void reduce(Iterable<Tuple2<String, String>> values, Collector<Edge<String, NullValue>> out) throws Exception;
187
}
188
```
189
190
#### Euclidean Graph Weighting
191
192
Demonstrates graph transformations and geometric computations with triplet operations.
193
194
```java { .api }
195
/**
196
* Euclidean Graph Weighting example showing graph transformations
197
* Demonstrates triplet operations and custom distance computations
198
*/
199
public class EuclideanGraphWeighing implements ProgramDescription {
200
/**
201
* Main entry point for command-line execution
202
* @param args Command-line arguments: <input vertices path> <input edges path> <output path>
203
* @throws Exception if execution fails
204
*/
205
public static void main(String[] args) throws Exception;
206
207
/**
208
* Program description for help and documentation
209
* @return Description of the Euclidean weighting transformation
210
*/
211
public String getDescription();
212
}
213
```
214
215
**Public Utility Classes:**
216
```java { .api }
217
/**
218
* 2D Point class for geometric computations
219
* Represents vertex positions in Euclidean space
220
*/
221
public static class Point implements Serializable {
222
public double x;
223
public double y;
224
225
public Point(double x, double y);
226
227
/**
228
* Calculate Euclidean distance between two points
229
* @param other Other point to calculate distance to
230
* @return Euclidean distance as double
231
*/
232
public double euclideanDistance(Point other);
233
}
234
```
235
236
#### Incremental SSSP
237
238
Advanced example showing incremental shortest path updates when edges are removed from the graph.
239
240
```java { .api }
241
/**
242
* Incremental Single Source Shortest Paths with edge removal
243
* Demonstrates dynamic graph updates and incremental computation
244
*/
245
public class IncrementalSSSP implements ProgramDescription {
246
/**
247
* Main entry point for command-line execution
248
* @param args Complex parameter list for incremental SSSP
249
* @throws Exception if execution fails
250
*/
251
public static void main(String[] args) throws Exception;
252
253
/**
254
* Program description for help and documentation
255
* @return Description of the incremental SSSP algorithm
256
*/
257
public String getDescription();
258
259
/**
260
* Check if an edge is part of the shortest path tree
261
* @param edgeToBeRemoved Edge to check for SSSP membership
262
* @param edgesInSSSP DataSet of edges currently in the SSSP tree
263
* @return True if edge is in SSSP, false otherwise
264
* @throws Exception if check fails
265
*/
266
public static boolean isInSSSP(Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception;
267
}
268
```
269
270
**Inner Algorithm Classes:**
271
```java { .api }
272
/**
273
* Message passing function for invalidating affected paths
274
* Sends invalidation messages when edges are removed
275
*/
276
public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> {
277
public void sendMessages(Vertex<Long, Double> vertex) throws Exception;
278
}
279
280
/**
281
* Vertex update function for recalculating distances
282
* Updates vertex distances based on invalidation messages
283
*/
284
public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
285
public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception;
286
}
287
```
288
289
### Reusable Algorithm Classes
290
291
#### PageRank Algorithm Implementation
292
293
Generic PageRank algorithm implementation for programmatic usage.
294
295
```java { .api }
296
/**
297
* Generic PageRank algorithm implementation using scatter-gather iteration
298
* Can be used as a reusable component in larger applications
299
*/
300
public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
301
/**
302
* Create PageRank algorithm with parameters
303
* @param beta Damping factor for random walk (typically 0.85)
304
* @param maxIterations Maximum number of iterations
305
*/
306
public PageRank(double beta, int maxIterations);
307
308
/**
309
* Execute PageRank on the input graph
310
* @param network Input graph with Double vertex and edge values
311
* @return DataSet of vertices with PageRank scores
312
* @throws Exception if algorithm execution fails
313
*/
314
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception;
315
}
316
```
317
318
**Inner Algorithm Classes:**
319
```java { .api }
320
/**
321
* Message passing function for PageRank score distribution
322
* Sends rank messages along outgoing edges
323
*/
324
public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
325
public void sendMessages(Vertex<K, Double> vertex) throws Exception;
326
}
327
328
/**
329
* Vertex update function for PageRank score computation
330
* Updates vertex ranks based on incoming messages
331
*/
332
public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
333
public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) throws Exception;
334
}
335
336
/**
337
* Initialization function for vertex weights
338
* Sets initial PageRank values for all vertices
339
*/
340
public static final class InitWeights implements MapFunction<Vertex<K, NullValue>, Double> {
341
public Double map(Vertex<K, NullValue> value) throws Exception;
342
}
343
```
344
345
#### GSA PageRank Implementation
346
347
PageRank implementation using the gather-sum-apply programming model.
348
349
```java { .api }
350
/**
351
* PageRank algorithm using gather-sum-apply iteration
352
* Alternative implementation demonstrating GSA programming model
353
*/
354
public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
355
/**
356
* Create GSA PageRank algorithm with parameters
357
* @param beta Damping factor for random walk
358
* @param maxIterations Maximum number of iterations
359
*/
360
public GSAPageRank(double beta, int maxIterations);
361
362
/**
363
* Execute GSA PageRank on the input graph
364
* @param network Input graph with Double vertex and edge values
365
* @return DataSet of vertices with PageRank scores
366
* @throws Exception if algorithm execution fails
367
*/
368
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception;
369
}
370
```
371
372
**Inner GSA Classes:**
373
```java { .api }
374
/**
375
* Gather function for collecting neighbor ranks
376
* Gathers PageRank values from neighboring vertices
377
*/
378
public static final class GatherRanks implements GatherFunction<Double, Double, Double> {
379
public Double gather(Neighbor<Double, Double> neighbor) throws Exception;
380
}
381
382
/**
383
* Sum function for aggregating gathered ranks
384
* Sums all gathered rank values for each vertex
385
*/
386
public static final class SumRanks implements SumFunction<Double, Double, Double> {
387
public Double sum(Double newValue, Double currentValue) throws Exception;
388
}
389
390
/**
391
* Apply function for updating vertex ranks
392
* Applies PageRank formula with damping factor
393
*/
394
public static final class UpdateRanks<K> implements ApplyFunction<K, Double, Double> {
395
public void apply(Double summedRanks, Vertex<K, Double> vertex) throws Exception;
396
}
397
```
398
399
### Data Generators
400
401
The examples package includes comprehensive data generators for testing and demonstration:
402
403
```java { .api }
404
/**
405
* PageRank test data generator
406
* Provides sample graphs and expected results for PageRank algorithm
407
*/
408
public class PageRankData {
409
/**
410
* Get default edge dataset for PageRank testing
411
* @param env Flink ExecutionEnvironment
412
* @return DataSet of sample edges
413
*/
414
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
415
416
// Public constants for test data
417
public static final String EDGES = "...";
418
public static final String RANKS_AFTER_3_ITERATIONS = "...";
419
}
420
421
/**
422
* Single Source Shortest Paths test data generator
423
* Provides sample graphs and expected SSSP results
424
*/
425
public class SingleSourceShortestPathsData {
426
/**
427
* Get default edge dataset for SSSP testing
428
* @param env Flink ExecutionEnvironment
429
* @return DataSet of sample edges with weights
430
*/
431
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
432
433
// Public constants
434
public static final Long SRC_VERTEX_ID = 1L;
435
public static final String EDGES = "...";
436
public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS = "...";
437
}
438
439
/**
440
* Music profiles test data generator
441
* Provides sample user-song interaction data
442
*/
443
public class MusicProfilesData {
444
/**
445
* Get sample user-song triplet data
446
* @param env Flink ExecutionEnvironment
447
* @return DataSet of user-song-playcount triplets
448
*/
449
public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env);
450
451
/**
452
* Get sample mismatch data for filtering
453
* @param env Flink ExecutionEnvironment
454
* @return DataSet of problematic song IDs
455
*/
456
public static DataSet<String> getMismatches(ExecutionEnvironment env);
457
}
458
459
/**
460
* Euclidean graph test data generator
461
* Provides sample geometric graphs with point coordinates
462
*/
463
public class EuclideanGraphData {
464
/**
465
* Get default vertex dataset with 2D coordinates
466
* @param env Flink ExecutionEnvironment
467
* @return DataSet of vertices with Point values
468
*/
469
public static DataSet<Vertex<Long, EuclideanGraphWeighing.Point>> getDefaultVertexDataSet(ExecutionEnvironment env);
470
471
/**
472
* Get corresponding edge dataset
473
* @param env Flink ExecutionEnvironment
474
* @return DataSet of edges with distance weights
475
*/
476
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
477
}
478
```
479
480
### Scala Examples
481
482
The package also includes Scala implementations demonstrating the Scala Graph API:
483
484
```scala { .api }
485
/**
486
* Connected Components implementation in Scala
487
* Demonstrates Scala Graph API usage with GSA
488
*/
489
object ConnectedComponents {
490
/**
491
* Main entry point for Scala Connected Components
492
* @param args Command-line arguments: <edge path> <output path> <num iterations>
493
*/
494
def main(args: Array[String]): Unit
495
}
496
497
/**
498
* Single Source Shortest Paths in Scala using scatter-gather
499
* Shows Scala functional programming patterns with Gelly
500
*/
501
object SingleSourceShortestPaths {
502
/**
503
* Main entry point for Scala SSSP
504
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
505
*/
506
def main(args: Array[String]): Unit
507
}
508
509
/**
510
* GSA Single Source Shortest Paths in Scala
511
* Demonstrates GSA programming model in Scala
512
*/
513
object GSASingleSourceShortestPaths {
514
/**
515
* Main entry point for Scala GSA SSSP
516
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
517
*/
518
def main(args: Array[String]): Unit
519
}
520
```
521
522
## Types
523
524
```java { .api }
525
// Graph algorithm interfaces
526
interface GraphAlgorithm<K, VV, EV, T> extends Serializable {
527
DataSet<T> run(Graph<K, VV, EV> input) throws Exception;
528
}
529
530
interface ProgramDescription {
531
String getDescription();
532
}
533
534
// Gelly programming model interfaces
535
abstract class ComputeFunction<K, VV, EV, M> {
536
public abstract void compute(Vertex<K, VV> vertex, MessageIterator<M> messages) throws Exception;
537
}
538
539
abstract class MessagingFunction<K, VV, EV, M> {
540
public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
541
}
542
543
abstract class VertexUpdateFunction<K, VV, M> {
544
public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<M> inMessages) throws Exception;
545
}
546
547
// GSA interfaces
548
interface GatherFunction<VV, EV, M> {
549
M gather(Neighbor<VV, EV> neighbor) throws Exception;
550
}
551
552
interface SumFunction<VV, EV, M> {
553
M sum(M newValue, M currentValue) throws Exception;
554
}
555
556
interface ApplyFunction<K, VV, M> {
557
void apply(M newValue, Vertex<K, VV> vertex) throws Exception;
558
}
559
560
// Flink DataSet API types
561
class DataSet<T> {
562
// Distributed dataset operations
563
}
564
565
class Tuple2<T0, T1> {
566
public T0 f0;
567
public T1 f1;
568
}
569
570
class Tuple3<T0, T1, T2> {
571
public T0 f0;
572
public T1 f1;
573
public T2 f2;
574
}
575
```