Gelly: Flink Graph API - A comprehensive graph processing library for Apache Flink
npx @tessl/cli install tessl/maven-org-apache-flink--flink-gelly-2-10@1.3.0Gelly is Apache Flink's Graph API library that provides a comprehensive set of methods and utilities for developing graph analysis applications on the Flink platform. It enables developers to create, transform, and modify graphs using high-level functions similar to batch processing APIs, supporting both vertex-centric and edge-centric operations.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.Edge;
import org.apache.flink.api.java.ExecutionEnvironment;For specific functionality:
// Iterative processing models
import org.apache.flink.graph.pregel.ComputeFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.spargel.ScatterFunction;
// Algorithms
import org.apache.flink.graph.library.ConnectedComponents;
import org.apache.flink.graph.library.SingleSourceShortestPaths;
import org.apache.flink.graph.library.link_analysis.PageRank;
// Analytics
import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.Edge;
import org.apache.flink.types.NullValue;
import java.util.Arrays;
// Set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create vertices
List<Vertex<Long, Double>> vertices = Arrays.asList(
new Vertex<>(1L, 1.0),
new Vertex<>(2L, 2.0),
new Vertex<>(3L, 3.0)
);
// Create edges
List<Edge<Long, NullValue>> edges = Arrays.asList(
new Edge<>(1L, 2L, NullValue.getInstance()),
new Edge<>(2L, 3L, NullValue.getInstance()),
new Edge<>(3L, 1L, NullValue.getInstance())
);
// Create graph
Graph<Long, Double, NullValue> graph = Graph.fromCollection(vertices, edges, env);
// Basic operations
DataSet<Vertex<Long, Double>> filteredVertices = graph
.filterOnVertices(vertex -> vertex.getValue() > 1.5)
.getVertices();
// Execute
env.execute("Basic Graph Example");Gelly follows a layered architecture built on Flink's DataSet API:
Graph<K, VV, EV> class with generic types for keys (K), vertex values (VV), and edge values (EV)Vertex<K, V> and Edge<K, V> classes extending Flink tuples for efficient serializationThis design enables scalable graph processing on distributed Flink clusters while maintaining type safety and integration with Flink's broader ecosystem.
Core functionality for creating graphs from various data sources (collections, DataSets, CSV files) and transforming graph structure and data. Includes vertex and edge filtering, subgraph extraction, and graph set operations.
// Static factory methods
public static <K, VV, EV> Graph<K, VV, EV> fromCollection(
Collection<Vertex<K, VV>> vertices,
Collection<Edge<K, EV>> edges,
ExecutionEnvironment context)
public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(
DataSet<Vertex<K, VV>> vertices,
DataSet<Edge<K, EV>> edges,
ExecutionEnvironment context)
public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
DataSet<Edge<K, EV>> edges,
ExecutionEnvironment context)
// Transformation methods
public <NV> Graph<K, NV, EV> mapVertices(MapFunction<Vertex<K, VV>, NV> mapper)
public <NE> Graph<K, VV, NE> mapEdges(MapFunction<Edge<K, EV>, NE> mapper)
public Graph<K, VV, EV> filterOnVertices(FilterFunction<Vertex<K, VV>> vertexFilter)
public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter)
public Graph<K, VV, EV> subgraph(
FilterFunction<Vertex<K, VV>> vertexFilter,
FilterFunction<Edge<K, EV>> edgeFilter)Graph Creation and Transformation
Three distributed iterative computation models for implementing graph algorithms: Vertex-centric (Pregel), Scatter-Gather, and Gather-Sum-Apply. Each model provides different abstractions for message-passing graph computations.
// Vertex-centric (Pregel) iteration
public <M> Graph<K, VV, EV> runVertexCentricIteration(
ComputeFunction<K, VV, EV, M> computeFunction,
MessageCombiner<K, M> combiner,
int maxIterations)
// Scatter-Gather iteration
public <M> Graph<K, VV, EV> runScatterGatherIteration(
ScatterFunction<K, VV, M, EV> scatterFunction,
GatherFunction<K, VV, M> gatherFunction,
int maxIterations)
// Gather-Sum-Apply iteration
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
GatherFunction<VV, EV, M> gatherFunction,
SumFunction<VV, EV, M> sumFunction,
ApplyFunction<K, VV, M> applyFunction,
int maxIterations)Pre-implemented algorithms for common graph analysis tasks including shortest paths, PageRank, connected components, community detection, and clustering. Algorithms are optimized for distributed execution on Flink clusters.
// Algorithm execution
public <T> T run(GraphAlgorithm<K, VV, EV, T> algorithm)
// Example algorithm constructors
public ConnectedComponents(int maxIterations)
public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations)
public PageRank(double dampingFactor, int maxIterations)
public GSAConnectedComponents(int maxIterations)
public CommunityDetection(double deltaThreshold, int maxIterations)Comprehensive analytics framework (ASM) for computing graph statistics, degree distributions, and structural metrics. Includes both directed and undirected graph analytics with efficient accumulator-based result collection.
// Analytics execution
public <T> T run(GraphAnalytic<K, VV, EV, T> analytic)
// Example analytics
public DataSet<Tuple2<K, LongValue>> inDegrees()
public DataSet<Tuple2<K, LongValue>> outDegrees()
public DataSet<Tuple2<K, LongValue>> getDegrees()
public long numberOfVertices()
public long numberOfEdges()
// ASM analytics interface
public interface DataSetAnalytic<T, R> {
R getResult();
R execute();
DataSetAnalytic<T, R> run(DataSet<T> input);
}Utilities for creating synthetic graphs for testing, benchmarking, and algorithm development. Includes complete graphs, random graphs, paths, hypercubes, and configurable generators with various probability distributions.
// Generator base class
public abstract class AbstractGraphGenerator<K, VV, EV> {
public Graph<K, VV, EV> generate();
public AbstractGraphGenerator<K, VV, EV> setParallelism(int parallelism);
}
// Specific generators
public CompleteGraph(ExecutionEnvironment env, long numVertices)
public EchoGraph(ExecutionEnvironment env, long numVertices)
public EmptyGraph(ExecutionEnvironment env, long numVertices)
public PathGraph(ExecutionEnvironment env, long numVertices)
public HypercubeGraph(ExecutionEnvironment env, long dimensions)Methods for accessing graph data, computing neighborhoods, performing joins, and integrating with Flink's DataSet API. Includes graph validation, format conversion, and utility functions for common operations.
// Data access
public DataSet<Vertex<K, VV>> getVertices()
public DataSet<Edge<K, EV>> getEdges()
public DataSet<Triplet<K, VV, EV>> getTriplets()
public DataSet<K> getVertexIds()
public DataSet<Tuple2<K, K>> getEdgeIds()
// Neighborhood operations
public <T> DataSet<T> groupReduceOnEdges(
EdgesFunction<K, EV, T> edgesFunction,
EdgeDirection direction)
public <T> DataSet<T> groupReduceOnNeighbors(
NeighborsFunction<K, VV, EV, T> neighborsFunction,
EdgeDirection direction)
// Graph validation
public Graph<K, VV, EV> validate(GraphValidator<K, VV, EV> validator)