A collection of example applications demonstrating graph processing algorithms using Apache Flink's Gelly Graph API
—
Standalone example implementations demonstrating various graph processing algorithms and programming patterns. These examples showcase different approaches to graph analysis using Apache Flink's Gelly API and serve as reference implementations for common use cases.
Collection of standalone Java implementations demonstrating different algorithm approaches and Flink programming patterns.
Generic PageRank implementation using scatter-gather pattern with configurable parameters.
/**
* PageRank algorithm implementation with generic key type support
* Uses scatter-gather pattern for distributed computation
* @param <K> Vertex key type
*/
public class PageRank<K> {
/**
* Create PageRank instance with specified parameters
* @param beta Damping factor (typically 0.85)
* @param maxIterations Maximum number of iterations
*/
public PageRank(double beta, int maxIterations);
/**
* Execute PageRank algorithm on input graph
* @param network Input graph with double-valued vertices and edges
* @return DataSet of vertices with PageRank scores
*/
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network);
}Usage Examples:
import org.apache.flink.graph.examples.PageRank;
// Create PageRank instance
PageRank<Long> pageRank = new PageRank<>(0.85, 10);
// Prepare graph with initial values
Graph<Long, Double, Double> graph = inputGraph
.mapVertices(new MapFunction<Vertex<Long, NullValue>, Double>() {
public Double map(Vertex<Long, NullValue> vertex) {
return 1.0; // Initial PageRank value
}
});
// Execute algorithm
DataSet<Vertex<Long, Double>> results = pageRank.run(graph);
results.print();Standard implementation of single source shortest paths using iterative vertex-centric computation.
/**
* Single Source Shortest Paths algorithm
* Computes shortest distances from a source vertex to all reachable vertices
*/
public class SingleSourceShortestPaths {
/**
* Execute SSSP from specified source vertex
* @param graph Input graph with numeric edge weights
* @param srcVertexId Source vertex identifier
* @param maxIterations Maximum number of iterations
* @return DataSet of vertices with shortest distances
*/
public static DataSet<Vertex<Long, Double>> run(
Graph<Long, NullValue, Double> graph,
Long srcVertexId,
Integer maxIterations
) throws Exception;
}Usage Examples:
// Execute SSSP from vertex 1
DataSet<Vertex<Long, Double>> distances =
SingleSourceShortestPaths.run(weightedGraph, 1L, 10);
// Filter reachable vertices
DataSet<Vertex<Long, Double>> reachable = distances
.filter(vertex -> vertex.getValue() < Double.POSITIVE_INFINITY);Examples demonstrating the Gather-Sum-Apply (GSA) programming model.
/**
* PageRank implementation using Gather-Sum-Apply pattern
*/
public class GSAPageRank {
public static void main(String[] args) throws Exception;
/** Custom gather function for PageRank computation */
public static class GatherRanks implements GatherFunction<Double, Double, Double>;
/** Custom sum function for aggregating ranks */
public static class SumRanks implements SumFunction<Double, Double, Double>;
/** Custom apply function for updating vertex values */
public static class UpdateRanks implements ApplyFunction<Long, Double, Double>;
}/**
* SSSP implementation using Gather-Sum-Apply pattern
*/
public class GSASingleSourceShortestPaths {
public static void main(String[] args) throws Exception;
/** Gather minimum distances from neighbors */
public static class GatherDistances implements GatherFunction<Double, Double, Double>;
/** Select minimum distance */
public static class SelectMinDistance implements SumFunction<Double, Double, Double>;
/** Update vertex distance if improvement found */
public static class UpdateDistance implements ApplyFunction<Long, Double, Double>;
}Advanced implementation demonstrating incremental graph processing with dynamic updates.
/**
* Incremental Single Source Shortest Paths
* Efficiently recomputes shortest paths when graph structure changes
*/
public class IncrementalSSSP {
public static void main(String[] args) throws Exception;
/**
* Process incremental updates to shortest path tree
* @param graph Base graph with current distances
* @param edgeUpdates Stream of edge additions/deletions
* @return Updated shortest path tree
*/
public static DataSet<Vertex<Long, Double>> processUpdates(
Graph<Long, Double, Double> graph,
DataSet<Edge<Long, Double>> edgeUpdates
) throws Exception;
}Real-world example demonstrating graph-based recommendation system using collaborative filtering.
/**
* Music recommendation system using graph processing
* Builds user-song bipartite graph and computes recommendations
*/
public class MusicProfiles {
public static void main(String[] args) throws Exception;
/**
* Generate music recommendations based on user listening history
* @param userSongGraph Bipartite graph of users and songs
* @param targetUser User to generate recommendations for
* @return Ranked list of song recommendations
*/
public static DataSet<Tuple2<String, Double>> generateRecommendations(
Graph<String, NullValue, Double> userSongGraph,
String targetUser
) throws Exception;
}Example demonstrating spatial graph processing with geometric distance calculations.
/**
* Weight graph edges by Euclidean distance between vertex coordinates
* Useful for spatial networks and geographic data processing
*/
public class EuclideanGraphWeighing {
public static void main(String[] args) throws Exception;
/**
* Compute Euclidean distances for graph edges
* @param graph Graph with vertex coordinates
* @return Graph with distance-weighted edges
*/
public static Graph<Long, Point, Double> computeDistances(
Graph<Long, Point, NullValue> graph
) throws Exception;
}Implementation showcasing the Pregel programming model for vertex-centric computation.
/**
* Single Source Shortest Paths using Pregel model
* Demonstrates vertex-centric programming approach
*/
public class PregelSSSP {
public static void main(String[] args) throws Exception;
/** Pregel compute function for SSSP */
public static class SSSPComputeFunction extends ComputeFunction<Long, Double, Double, Double>;
/** Message combiner for efficiency */
public static class MinMessageCombiner implements MessageCombiner<Long, Double>;
}Scala implementations demonstrating functional programming approaches to graph processing.
/**
* Connected Components implementation in Scala
* Demonstrates functional programming style with Flink Scala API
*/
object ConnectedComponents {
def main(args: Array[String]): Unit
/**
* Find connected components using label propagation
* @param graph Input graph
* @param maxIterations Maximum iterations
* @return Components as vertex-component pairs
*/
def findComponents(
graph: Graph[Long, Long, NullValue],
maxIterations: Int
): DataSet[(Long, Long)]
}/**
* GSA Single Source Shortest Paths in Scala
* Functional implementation of gather-sum-apply pattern
*/
object GSASingleSourceShortestPaths {
def main(args: Array[String]): Unit
/**
* Compute shortest paths using functional GSA approach
* @param graph Weighted graph
* @param srcVertex Source vertex
* @return Shortest distances from source
*/
def computeShortestPaths(
graph: Graph[Long, Double, Double],
srcVertex: Long
): DataSet[(Long, Double)]
}/**
* Standard SSSP implementation in Scala
* Demonstrates idiomatic Scala programming with Flink
*/
object SingleSourceShortestPaths {
def main(args: Array[String]): Unit
// Functional vertex update operations
def updateDistance(vertex: Vertex[Long, Double], messages: Iterator[Double]): Double
// Distance initialization
def initializeDistances(srcVertex: Long): MapFunction[Vertex[Long, NullValue], Double]
}Collection of utility classes providing default datasets for testing and benchmarking examples.
/**
* Default datasets for PageRank testing
*/
public class PageRankData {
/** Get default vertex dataset */
public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment env);
/** Get default edge dataset */
public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env);
/** Expected PageRank results for validation */
public static String getExpectedResult();
}
/**
* Test data for connected components algorithms
*/
public class ConnectedComponentsDefaultData {
public static DataSet<Vertex<Long, Long>> getDefaultVertexDataSet(ExecutionEnvironment env);
public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env);
public static String getExpectedResult();
}
/**
* Spatial graph data for Euclidean distance examples
*/
public class EuclideanGraphData {
/** Vertices with 2D coordinates */
public static DataSet<Vertex<Long, Point>> getDefaultVertexDataSet(ExecutionEnvironment env);
/** Edges for spatial connectivity */
public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env);
}
/**
* Music recommendation test data
*/
public class MusicProfilesData {
/** User-song interaction data */
public static DataSet<Tuple3<String, String, Double>> getUserSongRatings(ExecutionEnvironment env);
/** Expected recommendation results */
public static String getExpectedResult();
}// Direct execution of example algorithms
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create test graph
Graph<Long, NullValue, Double> graph = Graph.fromDataSet(
vertices, edges, env
);
// Run algorithm
DataSet<Vertex<Long, Double>> result =
SingleSourceShortestPaths.run(graph, 1L, 10);
// Print results
result.print();
}// Use examples as reference for custom drivers
public class CustomPageRankDriver extends DriverBase<Long, NullValue, Double> {
private PageRank<Long> pageRankImpl;
@Override
public DataSet plan(Graph<Long, NullValue, Double> graph) throws Exception {
// Initialize PageRank with parameters
pageRankImpl = new PageRank<>(
dampingFactor.getValue(),
iterations.getValue()
);
// Convert graph format and execute
Graph<Long, Double, Double> initializedGraph =
graph.mapVertices(vertex -> 1.0);
return pageRankImpl.run(initializedGraph);
}
}// Use data classes for consistent testing
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Get standard test data
DataSet<Vertex<Long, Double>> vertices = PageRankData.getDefaultVertexDataSet(env);
DataSet<Edge<Long, Double>> edges = PageRankData.getDefaultEdgeDataSet(env);
Graph<Long, Double, Double> testGraph = Graph.fromDataSet(vertices, edges, env);
// Run algorithm
PageRank<Long> algorithm = new PageRank<>(0.85, 10);
DataSet<Vertex<Long, Double>> results = algorithm.run(testGraph);
// Validate against expected results
String expected = PageRankData.getExpectedResult();
String actual = DataSetUtils.collect(results).toString();
assertEquals(expected, actual);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gelly-examples-2-12