Apache Flink Gelly Examples - A collection of graph processing examples and algorithms using the Flink Gelly graph processing library
npx @tessl/cli install tessl/maven-org-apache-flink--flink-gelly-examples_2-10@1.3.0Apache Flink Gelly Examples is a comprehensive collection of graph processing examples and algorithms built on Apache Flink's Gelly graph processing library. It provides both a unified command-line driver framework for executing graph algorithms and individual algorithm implementations for programmatic usage. The library includes implementations of fundamental graph algorithms like PageRank, Single Source Shortest Paths, Connected Components, and various graph generators for testing and benchmarking.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly-examples_2.10</artifactId>
<version>1.3.3</version>
</dependency>Main entry point for command-line execution:
import org.apache.flink.graph.Runner;For programmatic usage of drivers:
import org.apache.flink.graph.drivers.PageRank;
import org.apache.flink.graph.drivers.ConnectedComponents;
import org.apache.flink.graph.drivers.input.RMatGraph;
import org.apache.flink.graph.drivers.input.CSV;Legacy example algorithms:
import org.apache.flink.graph.examples.PageRank;
import org.apache.flink.graph.examples.SingleSourceShortestPaths;Command-line execution using the unified Runner:
# Run PageRank on a complete graph with 1000 vertices
flink run flink-gelly-examples_2.10-1.3.3.jar \
--algorithm PageRank \
--input CompleteGraph --vertex_count 1000 \
--output print
# Run Connected Components on CSV data
flink run flink-gelly-examples_2.10-1.3.3.jar \
--algorithm ConnectedComponents \
--input CSV --input_filename graph.csv \
--output csv --output_filename results.csvProgrammatic usage with drivers:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.drivers.PageRank;
import org.apache.flink.graph.drivers.input.CompleteGraph;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create a complete graph with 100 vertices
CompleteGraph input = new CompleteGraph();
// Configure input parameters...
Graph<LongValue, NullValue, NullValue> graph = input.create(env);
// Run PageRank algorithm
PageRank<LongValue, NullValue, NullValue> pagerank = new PageRank<>();
// Configure algorithm parameters...
pagerank.plan(graph);
pagerank.print("PageRank Results");Flink Gelly Examples is organized around several key architectural components:
Runner) that coordinates input sources, algorithms, and output formatters through a consistent command-line interfaceUnified driver system for executing graph algorithms with configurable inputs and outputs. Provides consistent parameter handling and usage help.
public class Runner {
public static void main(String[] args) throws Exception;
}Collection of ready-to-use graph algorithm implementations including PageRank, HITS, Connected Components, clustering coefficients, and similarity measures.
public interface Driver<K, VV, EV> extends Parameterized {
String getShortDescription();
String getLongDescription();
void plan(Graph<K, VV, EV> graph) throws Exception;
}Comprehensive set of graph generators and file readers for creating test graphs, including random generators (R-MAT), regular structures (grids, complete graphs), and CSV file input.
public interface Input<K, VV, EV> extends Parameterized {
String getIdentity();
Graph<K, VV, EV> create(ExecutionEnvironment env) throws Exception;
}Type-safe parameter framework supporting various parameter types (long, double, boolean, choice, string) with validation and command-line parsing.
public interface Parameterized {
String getName();
String getUsage();
void configure(ParameterTool parameterTool) throws ProgramParametrizationException;
}Standalone example implementations demonstrating different Flink Gelly programming models including scatter-gather, gather-sum-apply (GSA), and vertex-centric (Pregel) approaches.
public interface GraphAlgorithm<K, VV, EV, T> extends Serializable {
DataSet<T> run(Graph<K, VV, EV> input) throws Exception;
}Core Flink and Gelly types used throughout the API:
// Flink Graph API
class Graph<K, VV, EV> {
// Graph operations and transformations
}
class Vertex<K, VV> {
public K getId();
public VV getValue();
}
class Edge<K, EV> {
public K getSource();
public K getTarget();
public EV getValue();
}
// Flink execution environment
class ExecutionEnvironment {
public static ExecutionEnvironment getExecutionEnvironment();
}
// Flink value types for serialization efficiency
class LongValue implements CopyableValue<LongValue> {
public LongValue(long value);
public long getValue();
public void setValue(long value);
}
class NullValue implements Value {
public static final NullValue getInstance();
}
// Additional Flink value types for type translation
class IntValue implements CopyableValue<IntValue> {
public IntValue(int value);
public int getValue();
}
class StringValue implements CopyableValue<StringValue> {
public StringValue(String value);
public String getValue();
}
class ByteValue implements CopyableValue<ByteValue> {
public ByteValue(byte value);
public byte getValue();
}
class ShortValue implements CopyableValue<ShortValue> {
public ShortValue(short value);
public short getValue();
}
// Result wrapper for algorithm outputs
interface PrintableResult {
String toString();
}
// Result class for algorithm outputs with vertex ID and computed value
class Result<K> implements PrintableResult {
public K getVertexId0();
public K getVertexId1(); // for pairwise results
public Double getScore(); // computed result value
}
// Parameter parsing
class ParameterTool {
public static ParameterTool fromArgs(String[] args);
public String get(String key);
public String getRequired(String key) throws RuntimeException;
public boolean has(String key);
}
// Exception types
class ProgramParametrizationException extends Exception {
public ProgramParametrizationException(String message);
}