The legacy examples package contains standalone implementations of graph algorithms demonstrating different Flink Gelly programming models. These examples serve as educational material and reference implementations, showing scatter-gather, gather-sum-apply (GSA), and vertex-centric (Pregel) approaches to graph processing.
Computes shortest paths from a single source vertex using the scatter-gather programming model.
/**
* Single Source Shortest Paths using scatter-gather iteration
* Demonstrates message passing between vertices for distance computation
*/
public class SingleSourceShortestPaths implements ProgramDescription {
/**
* Main entry point for command-line execution
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
* @throws Exception if execution fails
*/
public static void main(String[] args) throws Exception;
/**
* Program description for help and documentation
* @return Description of the SSSP algorithm and usage
*/
public String getDescription();
}Command-line Usage:
flink run flink-gelly-examples_2.10-1.3.3.jar \
org.apache.flink.graph.examples.SingleSourceShortestPaths \
<source_vertex_id> <input_edges_path> <output_path> <num_iterations>Usage Example:
flink run flink-gelly-examples_2.10-1.3.3.jar \
org.apache.flink.graph.examples.SingleSourceShortestPaths \
1 edges.csv shortest_paths.csv 10Computes shortest paths using the gather-sum-apply programming model.
/**
* Single Source Shortest Paths using gather-sum-apply iteration
* Demonstrates GSA programming model with gather, sum, and apply phases
*/
public class GSASingleSourceShortestPaths implements ProgramDescription {
/**
* Main entry point for command-line execution
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
* @throws Exception if execution fails
*/
public static void main(String[] args) throws Exception;
/**
* Program description for help and documentation
* @return Description of the GSA SSSP algorithm and usage
*/
public String getDescription();
}Computes shortest paths using the vertex-centric (Pregel) programming model.
/**
* Single Source Shortest Paths using vertex-centric (Pregel) computation
* Demonstrates Pregel programming model with compute functions and message combiners
*/
public class PregelSSSP implements ProgramDescription {
/**
* Main entry point for command-line execution
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
* @throws Exception if execution fails
*/
public static void main(String[] args) throws Exception;
/**
* Program description for help and documentation
* @return Description of the Pregel SSSP algorithm and usage
*/
public String getDescription();
}Inner Classes:
/**
* Vertex compute function for Pregel SSSP implementation
* Processes incoming messages and updates vertex state
*/
public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double> {
public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages) throws Exception;
}
/**
* Message combiner for Pregel SSSP to reduce message overhead
* Combines multiple messages to the same vertex
*/
public static final class SSSPCombiner extends MessageCombiner<Long, Double> {
public void combineMessages(MessageIterator<Double> messages) throws Exception;
}Complex example demonstrating mixed DataSet and Gelly API usage for user-song bipartite graph analysis.
/**
* Music Profiles analysis using bipartite graphs and community detection
* Demonstrates integration of DataSet operations with graph processing
*/
public class MusicProfiles implements ProgramDescription {
/**
* Main entry point for command-line execution
* @param args Complex parameter list for music analysis pipeline
* @throws Exception if execution fails
*/
public static void main(String[] args) throws Exception;
/**
* Program description for help and documentation
* @return Description of the music profiles analysis pipeline
*/
public String getDescription();
}Command-line Parameters:
<input_user_song_triplets_path> <input_song_mismatches_path> <output_top_tracks_path>
<playcount_threshold> <output_communities_path> <num_iterations>Public Transformation Classes:
/**
* Extract mismatch song IDs from mismatch data
* Maps mismatch records to song ID strings
*/
public static final class ExtractMismatchSongIds implements MapFunction<String, String> {
public String map(String value) throws Exception;
}
/**
* Filter out songs that appear in the mismatch list
* Removes problematic songs from the analysis dataset
*/
public static final class FilterOutMismatches implements FilterFunction<Tuple3<String, String, Integer>> {
public boolean filter(Tuple3<String, String, Integer> value) throws Exception;
}
/**
* Filter song vertices from user-song bipartite graph
* Identifies vertices representing songs vs users
*/
public static final class FilterSongNodes implements FilterFunction<Vertex<String, Double>> {
public boolean filter(Vertex<String, Double> vertex) throws Exception;
}
/**
* Get top song per user based on play counts
* Finds the most played song for each user
*/
public static final class GetTopSongPerUser implements GroupReduceFunction<Tuple3<String, String, Integer>, Tuple2<String, String>> {
public void reduce(Iterable<Tuple3<String, String, Integer>> values, Collector<Tuple2<String, String>> out) throws Exception;
}
/**
* Create edges between users who share similar music preferences
* Builds user similarity graph based on common top songs
*/
public static final class CreateSimilarUserEdges implements GroupReduceFunction<Tuple2<String, String>, Edge<String, NullValue>> {
public void reduce(Iterable<Tuple2<String, String>> values, Collector<Edge<String, NullValue>> out) throws Exception;
}Demonstrates graph transformations and geometric computations with triplet operations.
/**
* Euclidean Graph Weighting example showing graph transformations
* Demonstrates triplet operations and custom distance computations
*/
public class EuclideanGraphWeighing implements ProgramDescription {
/**
* Main entry point for command-line execution
* @param args Command-line arguments: <input vertices path> <input edges path> <output path>
* @throws Exception if execution fails
*/
public static void main(String[] args) throws Exception;
/**
* Program description for help and documentation
* @return Description of the Euclidean weighting transformation
*/
public String getDescription();
}Public Utility Classes:
/**
* 2D Point class for geometric computations
* Represents vertex positions in Euclidean space
*/
public static class Point implements Serializable {
public double x;
public double y;
public Point(double x, double y);
/**
* Calculate Euclidean distance between two points
* @param other Other point to calculate distance to
* @return Euclidean distance as double
*/
public double euclideanDistance(Point other);
}Advanced example showing incremental shortest path updates when edges are removed from the graph.
/**
* Incremental Single Source Shortest Paths with edge removal
* Demonstrates dynamic graph updates and incremental computation
*/
public class IncrementalSSSP implements ProgramDescription {
/**
* Main entry point for command-line execution
* @param args Complex parameter list for incremental SSSP
* @throws Exception if execution fails
*/
public static void main(String[] args) throws Exception;
/**
* Program description for help and documentation
* @return Description of the incremental SSSP algorithm
*/
public String getDescription();
/**
* Check if an edge is part of the shortest path tree
* @param edgeToBeRemoved Edge to check for SSSP membership
* @param edgesInSSSP DataSet of edges currently in the SSSP tree
* @return True if edge is in SSSP, false otherwise
* @throws Exception if check fails
*/
public static boolean isInSSSP(Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception;
}Inner Algorithm Classes:
/**
* Message passing function for invalidating affected paths
* Sends invalidation messages when edges are removed
*/
public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> {
public void sendMessages(Vertex<Long, Double> vertex) throws Exception;
}
/**
* Vertex update function for recalculating distances
* Updates vertex distances based on invalidation messages
*/
public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception;
}Generic PageRank algorithm implementation for programmatic usage.
/**
* Generic PageRank algorithm implementation using scatter-gather iteration
* Can be used as a reusable component in larger applications
*/
public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
/**
* Create PageRank algorithm with parameters
* @param beta Damping factor for random walk (typically 0.85)
* @param maxIterations Maximum number of iterations
*/
public PageRank(double beta, int maxIterations);
/**
* Execute PageRank on the input graph
* @param network Input graph with Double vertex and edge values
* @return DataSet of vertices with PageRank scores
* @throws Exception if algorithm execution fails
*/
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception;
}Inner Algorithm Classes:
/**
* Message passing function for PageRank score distribution
* Sends rank messages along outgoing edges
*/
public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
public void sendMessages(Vertex<K, Double> vertex) throws Exception;
}
/**
* Vertex update function for PageRank score computation
* Updates vertex ranks based on incoming messages
*/
public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) throws Exception;
}
/**
* Initialization function for vertex weights
* Sets initial PageRank values for all vertices
*/
public static final class InitWeights implements MapFunction<Vertex<K, NullValue>, Double> {
public Double map(Vertex<K, NullValue> value) throws Exception;
}PageRank implementation using the gather-sum-apply programming model.
/**
* PageRank algorithm using gather-sum-apply iteration
* Alternative implementation demonstrating GSA programming model
*/
public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
/**
* Create GSA PageRank algorithm with parameters
* @param beta Damping factor for random walk
* @param maxIterations Maximum number of iterations
*/
public GSAPageRank(double beta, int maxIterations);
/**
* Execute GSA PageRank on the input graph
* @param network Input graph with Double vertex and edge values
* @return DataSet of vertices with PageRank scores
* @throws Exception if algorithm execution fails
*/
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception;
}Inner GSA Classes:
/**
* Gather function for collecting neighbor ranks
* Gathers PageRank values from neighboring vertices
*/
public static final class GatherRanks implements GatherFunction<Double, Double, Double> {
public Double gather(Neighbor<Double, Double> neighbor) throws Exception;
}
/**
* Sum function for aggregating gathered ranks
* Sums all gathered rank values for each vertex
*/
public static final class SumRanks implements SumFunction<Double, Double, Double> {
public Double sum(Double newValue, Double currentValue) throws Exception;
}
/**
* Apply function for updating vertex ranks
* Applies PageRank formula with damping factor
*/
public static final class UpdateRanks<K> implements ApplyFunction<K, Double, Double> {
public void apply(Double summedRanks, Vertex<K, Double> vertex) throws Exception;
}The examples package includes comprehensive data generators for testing and demonstration:
/**
* PageRank test data generator
* Provides sample graphs and expected results for PageRank algorithm
*/
public class PageRankData {
/**
* Get default edge dataset for PageRank testing
* @param env Flink ExecutionEnvironment
* @return DataSet of sample edges
*/
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
// Public constants for test data
public static final String EDGES = "...";
public static final String RANKS_AFTER_3_ITERATIONS = "...";
}
/**
* Single Source Shortest Paths test data generator
* Provides sample graphs and expected SSSP results
*/
public class SingleSourceShortestPathsData {
/**
* Get default edge dataset for SSSP testing
* @param env Flink ExecutionEnvironment
* @return DataSet of sample edges with weights
*/
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
// Public constants
public static final Long SRC_VERTEX_ID = 1L;
public static final String EDGES = "...";
public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS = "...";
}
/**
* Music profiles test data generator
* Provides sample user-song interaction data
*/
public class MusicProfilesData {
/**
* Get sample user-song triplet data
* @param env Flink ExecutionEnvironment
* @return DataSet of user-song-playcount triplets
*/
public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env);
/**
* Get sample mismatch data for filtering
* @param env Flink ExecutionEnvironment
* @return DataSet of problematic song IDs
*/
public static DataSet<String> getMismatches(ExecutionEnvironment env);
}
/**
* Euclidean graph test data generator
* Provides sample geometric graphs with point coordinates
*/
public class EuclideanGraphData {
/**
* Get default vertex dataset with 2D coordinates
* @param env Flink ExecutionEnvironment
* @return DataSet of vertices with Point values
*/
public static DataSet<Vertex<Long, EuclideanGraphWeighing.Point>> getDefaultVertexDataSet(ExecutionEnvironment env);
/**
* Get corresponding edge dataset
* @param env Flink ExecutionEnvironment
* @return DataSet of edges with distance weights
*/
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
}The package also includes Scala implementations demonstrating the Scala Graph API:
/**
* Connected Components implementation in Scala
* Demonstrates Scala Graph API usage with GSA
*/
object ConnectedComponents {
/**
* Main entry point for Scala Connected Components
* @param args Command-line arguments: <edge path> <output path> <num iterations>
*/
def main(args: Array[String]): Unit
}
/**
* Single Source Shortest Paths in Scala using scatter-gather
* Shows Scala functional programming patterns with Gelly
*/
object SingleSourceShortestPaths {
/**
* Main entry point for Scala SSSP
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
*/
def main(args: Array[String]): Unit
}
/**
* GSA Single Source Shortest Paths in Scala
* Demonstrates GSA programming model in Scala
*/
object GSASingleSourceShortestPaths {
/**
* Main entry point for Scala GSA SSSP
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
*/
def main(args: Array[String]): Unit
}// Graph algorithm interfaces
interface GraphAlgorithm<K, VV, EV, T> extends Serializable {
DataSet<T> run(Graph<K, VV, EV> input) throws Exception;
}
interface ProgramDescription {
String getDescription();
}
// Gelly programming model interfaces
abstract class ComputeFunction<K, VV, EV, M> {
public abstract void compute(Vertex<K, VV> vertex, MessageIterator<M> messages) throws Exception;
}
abstract class MessagingFunction<K, VV, EV, M> {
public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
}
abstract class VertexUpdateFunction<K, VV, M> {
public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<M> inMessages) throws Exception;
}
// GSA interfaces
interface GatherFunction<VV, EV, M> {
M gather(Neighbor<VV, EV> neighbor) throws Exception;
}
interface SumFunction<VV, EV, M> {
M sum(M newValue, M currentValue) throws Exception;
}
interface ApplyFunction<K, VV, M> {
void apply(M newValue, Vertex<K, VV> vertex) throws Exception;
}
// Flink DataSet API types
class DataSet<T> {
// Distributed dataset operations
}
class Tuple2<T0, T1> {
public T0 f0;
public T1 f1;
}
class Tuple3<T0, T1, T2> {
public T0 f0;
public T1 f1;
public T2 f2;
}