Apache Flink Gelly Examples - A collection of graph processing examples and algorithms using the Flink Gelly graph processing library
The legacy examples package contains standalone implementations of graph algorithms demonstrating different Flink Gelly programming models. These examples serve as educational material and reference implementations, showing scatter-gather, gather-sum-apply (GSA), and vertex-centric (Pregel) approaches to graph processing.
Computes shortest paths from a single source vertex using the scatter-gather programming model.
/**
* Single Source Shortest Paths using scatter-gather iteration
* Demonstrates message passing between vertices for distance computation
*/
public class SingleSourceShortestPaths implements ProgramDescription {
/**
* Main entry point for command-line execution
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
* @throws Exception if execution fails
*/
public static void main(String[] args) throws Exception;
/**
* Program description for help and documentation
* @return Description of the SSSP algorithm and usage
*/
public String getDescription();
}Command-line Usage:
flink run flink-gelly-examples_2.10-1.3.3.jar \
org.apache.flink.graph.examples.SingleSourceShortestPaths \
<source_vertex_id> <input_edges_path> <output_path> <num_iterations>Usage Example:
flink run flink-gelly-examples_2.10-1.3.3.jar \
org.apache.flink.graph.examples.SingleSourceShortestPaths \
1 edges.csv shortest_paths.csv 10Computes shortest paths using the gather-sum-apply programming model.
/**
* Single Source Shortest Paths using gather-sum-apply iteration
* Demonstrates GSA programming model with gather, sum, and apply phases
*/
public class GSASingleSourceShortestPaths implements ProgramDescription {
/**
* Main entry point for command-line execution
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
* @throws Exception if execution fails
*/
public static void main(String[] args) throws Exception;
/**
* Program description for help and documentation
* @return Description of the GSA SSSP algorithm and usage
*/
public String getDescription();
}Computes shortest paths using the vertex-centric (Pregel) programming model.
/**
* Single Source Shortest Paths using vertex-centric (Pregel) computation
* Demonstrates Pregel programming model with compute functions and message combiners
*/
public class PregelSSSP implements ProgramDescription {
/**
* Main entry point for command-line execution
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
* @throws Exception if execution fails
*/
public static void main(String[] args) throws Exception;
/**
* Program description for help and documentation
* @return Description of the Pregel SSSP algorithm and usage
*/
public String getDescription();
}Inner Classes:
/**
* Vertex compute function for Pregel SSSP implementation
* Processes incoming messages and updates vertex state
*/
public static final class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double> {
public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages) throws Exception;
}
/**
* Message combiner for Pregel SSSP to reduce message overhead
* Combines multiple messages to the same vertex
*/
public static final class SSSPCombiner extends MessageCombiner<Long, Double> {
public void combineMessages(MessageIterator<Double> messages) throws Exception;
}Complex example demonstrating mixed DataSet and Gelly API usage for user-song bipartite graph analysis.
/**
* Music Profiles analysis using bipartite graphs and community detection
* Demonstrates integration of DataSet operations with graph processing
*/
public class MusicProfiles implements ProgramDescription {
/**
* Main entry point for command-line execution
* @param args Complex parameter list for music analysis pipeline
* @throws Exception if execution fails
*/
public static void main(String[] args) throws Exception;
/**
* Program description for help and documentation
* @return Description of the music profiles analysis pipeline
*/
public String getDescription();
}Command-line Parameters:
<input_user_song_triplets_path> <input_song_mismatches_path> <output_top_tracks_path>
<playcount_threshold> <output_communities_path> <num_iterations>Public Transformation Classes:
/**
* Extract mismatch song IDs from mismatch data
* Maps mismatch records to song ID strings
*/
public static final class ExtractMismatchSongIds implements MapFunction<String, String> {
public String map(String value) throws Exception;
}
/**
* Filter out songs that appear in the mismatch list
* Removes problematic songs from the analysis dataset
*/
public static final class FilterOutMismatches implements FilterFunction<Tuple3<String, String, Integer>> {
public boolean filter(Tuple3<String, String, Integer> value) throws Exception;
}
/**
* Filter song vertices from user-song bipartite graph
* Identifies vertices representing songs vs users
*/
public static final class FilterSongNodes implements FilterFunction<Vertex<String, Double>> {
public boolean filter(Vertex<String, Double> vertex) throws Exception;
}
/**
* Get top song per user based on play counts
* Finds the most played song for each user
*/
public static final class GetTopSongPerUser implements GroupReduceFunction<Tuple3<String, String, Integer>, Tuple2<String, String>> {
public void reduce(Iterable<Tuple3<String, String, Integer>> values, Collector<Tuple2<String, String>> out) throws Exception;
}
/**
* Create edges between users who share similar music preferences
* Builds user similarity graph based on common top songs
*/
public static final class CreateSimilarUserEdges implements GroupReduceFunction<Tuple2<String, String>, Edge<String, NullValue>> {
public void reduce(Iterable<Tuple2<String, String>> values, Collector<Edge<String, NullValue>> out) throws Exception;
}Demonstrates graph transformations and geometric computations with triplet operations.
/**
* Euclidean Graph Weighting example showing graph transformations
* Demonstrates triplet operations and custom distance computations
*/
public class EuclideanGraphWeighing implements ProgramDescription {
/**
* Main entry point for command-line execution
* @param args Command-line arguments: <input vertices path> <input edges path> <output path>
* @throws Exception if execution fails
*/
public static void main(String[] args) throws Exception;
/**
* Program description for help and documentation
* @return Description of the Euclidean weighting transformation
*/
public String getDescription();
}Public Utility Classes:
/**
* 2D Point class for geometric computations
* Represents vertex positions in Euclidean space
*/
public static class Point implements Serializable {
public double x;
public double y;
public Point(double x, double y);
/**
* Calculate Euclidean distance between two points
* @param other Other point to calculate distance to
* @return Euclidean distance as double
*/
public double euclideanDistance(Point other);
}Advanced example showing incremental shortest path updates when edges are removed from the graph.
/**
* Incremental Single Source Shortest Paths with edge removal
* Demonstrates dynamic graph updates and incremental computation
*/
public class IncrementalSSSP implements ProgramDescription {
/**
* Main entry point for command-line execution
* @param args Complex parameter list for incremental SSSP
* @throws Exception if execution fails
*/
public static void main(String[] args) throws Exception;
/**
* Program description for help and documentation
* @return Description of the incremental SSSP algorithm
*/
public String getDescription();
/**
* Check if an edge is part of the shortest path tree
* @param edgeToBeRemoved Edge to check for SSSP membership
* @param edgesInSSSP DataSet of edges currently in the SSSP tree
* @return True if edge is in SSSP, false otherwise
* @throws Exception if check fails
*/
public static boolean isInSSSP(Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception;
}Inner Algorithm Classes:
/**
* Message passing function for invalidating affected paths
* Sends invalidation messages when edges are removed
*/
public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> {
public void sendMessages(Vertex<Long, Double> vertex) throws Exception;
}
/**
* Vertex update function for recalculating distances
* Updates vertex distances based on invalidation messages
*/
public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception;
}Generic PageRank algorithm implementation for programmatic usage.
/**
* Generic PageRank algorithm implementation using scatter-gather iteration
* Can be used as a reusable component in larger applications
*/
public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
/**
* Create PageRank algorithm with parameters
* @param beta Damping factor for random walk (typically 0.85)
* @param maxIterations Maximum number of iterations
*/
public PageRank(double beta, int maxIterations);
/**
* Execute PageRank on the input graph
* @param network Input graph with Double vertex and edge values
* @return DataSet of vertices with PageRank scores
* @throws Exception if algorithm execution fails
*/
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception;
}Inner Algorithm Classes:
/**
* Message passing function for PageRank score distribution
* Sends rank messages along outgoing edges
*/
public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
public void sendMessages(Vertex<K, Double> vertex) throws Exception;
}
/**
* Vertex update function for PageRank score computation
* Updates vertex ranks based on incoming messages
*/
public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) throws Exception;
}
/**
* Initialization function for vertex weights
* Sets initial PageRank values for all vertices
*/
public static final class InitWeights implements MapFunction<Vertex<K, NullValue>, Double> {
public Double map(Vertex<K, NullValue> value) throws Exception;
}PageRank implementation using the gather-sum-apply programming model.
/**
* PageRank algorithm using gather-sum-apply iteration
* Alternative implementation demonstrating GSA programming model
*/
public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
/**
* Create GSA PageRank algorithm with parameters
* @param beta Damping factor for random walk
* @param maxIterations Maximum number of iterations
*/
public GSAPageRank(double beta, int maxIterations);
/**
* Execute GSA PageRank on the input graph
* @param network Input graph with Double vertex and edge values
* @return DataSet of vertices with PageRank scores
* @throws Exception if algorithm execution fails
*/
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception;
}Inner GSA Classes:
/**
* Gather function for collecting neighbor ranks
* Gathers PageRank values from neighboring vertices
*/
public static final class GatherRanks implements GatherFunction<Double, Double, Double> {
public Double gather(Neighbor<Double, Double> neighbor) throws Exception;
}
/**
* Sum function for aggregating gathered ranks
* Sums all gathered rank values for each vertex
*/
public static final class SumRanks implements SumFunction<Double, Double, Double> {
public Double sum(Double newValue, Double currentValue) throws Exception;
}
/**
* Apply function for updating vertex ranks
* Applies PageRank formula with damping factor
*/
public static final class UpdateRanks<K> implements ApplyFunction<K, Double, Double> {
public void apply(Double summedRanks, Vertex<K, Double> vertex) throws Exception;
}The examples package includes comprehensive data generators for testing and demonstration:
/**
* PageRank test data generator
* Provides sample graphs and expected results for PageRank algorithm
*/
public class PageRankData {
/**
* Get default edge dataset for PageRank testing
* @param env Flink ExecutionEnvironment
* @return DataSet of sample edges
*/
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
// Public constants for test data
public static final String EDGES = "...";
public static final String RANKS_AFTER_3_ITERATIONS = "...";
}
/**
* Single Source Shortest Paths test data generator
* Provides sample graphs and expected SSSP results
*/
public class SingleSourceShortestPathsData {
/**
* Get default edge dataset for SSSP testing
* @param env Flink ExecutionEnvironment
* @return DataSet of sample edges with weights
*/
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
// Public constants
public static final Long SRC_VERTEX_ID = 1L;
public static final String EDGES = "...";
public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS = "...";
}
/**
* Music profiles test data generator
* Provides sample user-song interaction data
*/
public class MusicProfilesData {
/**
* Get sample user-song triplet data
* @param env Flink ExecutionEnvironment
* @return DataSet of user-song-playcount triplets
*/
public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env);
/**
* Get sample mismatch data for filtering
* @param env Flink ExecutionEnvironment
* @return DataSet of problematic song IDs
*/
public static DataSet<String> getMismatches(ExecutionEnvironment env);
}
/**
* Euclidean graph test data generator
* Provides sample geometric graphs with point coordinates
*/
public class EuclideanGraphData {
/**
* Get default vertex dataset with 2D coordinates
* @param env Flink ExecutionEnvironment
* @return DataSet of vertices with Point values
*/
public static DataSet<Vertex<Long, EuclideanGraphWeighing.Point>> getDefaultVertexDataSet(ExecutionEnvironment env);
/**
* Get corresponding edge dataset
* @param env Flink ExecutionEnvironment
* @return DataSet of edges with distance weights
*/
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
}The package also includes Scala implementations demonstrating the Scala Graph API:
/**
* Connected Components implementation in Scala
* Demonstrates Scala Graph API usage with GSA
*/
object ConnectedComponents {
/**
* Main entry point for Scala Connected Components
* @param args Command-line arguments: <edge path> <output path> <num iterations>
*/
def main(args: Array[String]): Unit
}
/**
* Single Source Shortest Paths in Scala using scatter-gather
* Shows Scala functional programming patterns with Gelly
*/
object SingleSourceShortestPaths {
/**
* Main entry point for Scala SSSP
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
*/
def main(args: Array[String]): Unit
}
/**
* GSA Single Source Shortest Paths in Scala
* Demonstrates GSA programming model in Scala
*/
object GSASingleSourceShortestPaths {
/**
* Main entry point for Scala GSA SSSP
* @param args Command-line arguments: <source vertex id> <input edges path> <output path> <num iterations>
*/
def main(args: Array[String]): Unit
}// Graph algorithm interfaces
interface GraphAlgorithm<K, VV, EV, T> extends Serializable {
DataSet<T> run(Graph<K, VV, EV> input) throws Exception;
}
interface ProgramDescription {
String getDescription();
}
// Gelly programming model interfaces
abstract class ComputeFunction<K, VV, EV, M> {
public abstract void compute(Vertex<K, VV> vertex, MessageIterator<M> messages) throws Exception;
}
abstract class MessagingFunction<K, VV, EV, M> {
public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
}
abstract class VertexUpdateFunction<K, VV, M> {
public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<M> inMessages) throws Exception;
}
// GSA interfaces
interface GatherFunction<VV, EV, M> {
M gather(Neighbor<VV, EV> neighbor) throws Exception;
}
interface SumFunction<VV, EV, M> {
M sum(M newValue, M currentValue) throws Exception;
}
interface ApplyFunction<K, VV, M> {
void apply(M newValue, Vertex<K, VV> vertex) throws Exception;
}
// Flink DataSet API types
class DataSet<T> {
// Distributed dataset operations
}
class Tuple2<T0, T1> {
public T0 f0;
public T1 f1;
}
class Tuple3<T0, T1, T2> {
public T0 f0;
public T1 f1;
public T2 f2;
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gelly-examples-2-10