Gelly: Flink Graph API - A comprehensive graph processing library for Apache Flink
—
Gelly provides three distributed iterative computation models for implementing graph algorithms: Vertex-centric (Pregel), Scatter-Gather, and Gather-Sum-Apply. Each model provides different abstractions for message-passing graph computations on distributed Flink clusters.
The vertex-centric model follows Google's Pregel programming paradigm where computation is performed at each vertex by processing incoming messages and sending messages to neighbors.
public <M> Graph<K, VV, EV> runVertexCentricIteration(
ComputeFunction<K, VV, EV, M> computeFunction,
MessageCombiner<K, M> combiner,
int maxIterations)
public <M> Graph<K, VV, EV> runVertexCentricIteration(
ComputeFunction<K, VV, EV, M> computeFunction,
MessageCombiner<K, M> combiner,
int maxIterations,
VertexCentricConfiguration parameters)Defines the computation performed at each vertex in each iteration.
public abstract class ComputeFunction<K, VV, EV, M> implements Serializable {
public abstract void compute(
Vertex<K, VV> vertex,
MessageIterator<M> messages) throws Exception;
// Message sending methods
public void sendMessageToAllNeighbors(M message)
public void sendMessageTo(K target, M message)
public Iterable<Edge<K, EV>> getEdges()
// Vertex value update
public void setNewVertexValue(VV newValue)
// Utility methods
public long getNumberOfVertices()
public int getSuperstepNumber()
public <T> Aggregator<T> getIterationAggregator(String name)
public void preSuperstep() throws Exception
public void postSuperstep() throws Exception
}Optional combiner to reduce message traffic by combining messages sent to the same vertex.
public interface MessageCombiner<K, M> extends java.io.Serializable {
void combineMessages(MessageIterator<M> messages, Collector<M> out) throws Exception;
}Iterator for processing incoming messages at a vertex.
public interface MessageIterator<M> extends Iterator<M> {
boolean hasNext();
M next();
}Configuration options for vertex-centric iterations.
public class VertexCentricConfiguration {
public VertexCentricConfiguration setName(String name)
public VertexCentricConfiguration setParallelism(int parallelism)
public VertexCentricConfiguration setSolutionSetUnmanagedMemory(boolean unmanaged)
public VertexCentricConfiguration registerAggregator(String name, Aggregator<?> aggregator)
}Usage Example:
// Single Source Shortest Path using Vertex-Centric iteration
public class SSSpComputeFunction extends ComputeFunction<Long, Double, Double, Double> {
@Override
public void compute(Vertex<Long, Double> vertex,
MessageIterator<Double> messages) throws Exception {
double minDistance = (vertex.getId().equals(1L)) ? 0.0 : Double.POSITIVE_INFINITY;
// Update distance with minimum from incoming messages
while (messages.hasNext()) {
minDistance = Math.min(minDistance, messages.next());
}
// If distance changed, propagate to neighbors
if (minDistance < vertex.getValue()) {
setNewVertexValue(minDistance);
// Send updated distance + edge weight to all neighbors
for (Edge<Long, Double> edge : getEdges()) {
sendMessageTo(edge.getTarget(), minDistance + edge.getValue());
}
}
}
}
// Run the algorithm
Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
new SSSpComputeFunction(),
new MinMessageCombiner(),
maxIterations
);The scatter-gather model separates message sending (scatter) and vertex update (gather) into distinct phases.
public <M> Graph<K, VV, EV> runScatterGatherIteration(
ScatterFunction<K, VV, M, EV> scatterFunction,
GatherFunction<K, VV, M> gatherFunction,
int maxIterations)
public <M> Graph<K, VV, EV> runScatterGatherIteration(
ScatterFunction<K, VV, M, EV> scatterFunction,
GatherFunction<K, VV, M> gatherFunction,
int maxIterations,
ScatterGatherConfiguration parameters)Defines how messages are sent to neighbors in the scatter phase.
public abstract class ScatterFunction<K, VV, M, EV> extends AbstractRichFunction {
public abstract void sendMessages(Vertex<K, VV> vertex, Collector<Tuple2<K, M>> out) throws Exception;
// Access to outgoing edges
public Iterable<Edge<K, EV>> getEdges()
// Utility methods
public long getNumberOfVertices()
public int getSuperstepNumber()
public <T> Aggregator<T> getIterationAggregator(String name)
public void preSuperstep() throws Exception
public void postSuperstep() throws Exception
}Defines how vertex values are updated based on incoming messages in the gather phase.
public abstract class GatherFunction<K, VV, M> extends AbstractRichFunction {
public abstract VV updateVertex(Vertex<K, VV> vertex, MessageIterator<M> inMessages) throws Exception;
// Utility methods
public long getNumberOfVertices()
public int getSuperstepNumber()
public <T> Aggregator<T> getIterationAggregator(String name)
public void preSuperstep() throws Exception
public void postSuperstep() throws Exception
}Configuration options for scatter-gather iterations.
public class ScatterGatherConfiguration {
public ScatterGatherConfiguration setName(String name)
public ScatterGatherConfiguration setParallelism(int parallelism)
public ScatterGatherConfiguration setSolutionSetUnmanagedMemory(boolean unmanaged)
public ScatterGatherConfiguration registerAggregator(String name, Aggregator<?> aggregator)
public ScatterGatherConfiguration setOptDegrees(boolean optDegrees)
}Usage Example:
// PageRank using Scatter-Gather
public class PageRankScatter extends ScatterFunction<Long, Double, Double, NullValue> {
@Override
public void sendMessages(Vertex<Long, Double> vertex, Collector<Tuple2<Long, Double>> out) {
int degree = 0;
for (Edge<Long, NullValue> edge : getEdges()) {
degree++;
}
double rankToSend = vertex.getValue() / degree;
for (Edge<Long, NullValue> edge : getEdges()) {
out.collect(new Tuple2<>(edge.getTarget(), rankToSend));
}
}
}
public class PageRankGather extends GatherFunction<Long, Double, Double> {
private final double dampingFactor = 0.85;
@Override
public Double updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
double sum = 0.0;
while (inMessages.hasNext()) {
sum += inMessages.next();
}
return (1.0 - dampingFactor) / getNumberOfVertices() + dampingFactor * sum;
}
}
// Run PageRank
Graph<Long, Double, NullValue> result = graph.runScatterGatherIteration(
new PageRankScatter(),
new PageRankGather(),
maxIterations
);The GSA model provides three distinct phases: gather information from neighbors, sum/aggregate the gathered information, and apply the result to update vertex values.
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
GatherFunction<VV, EV, M> gatherFunction,
SumFunction<VV, EV, M> sumFunction,
ApplyFunction<K, VV, M> applyFunction,
int maxIterations)
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
GatherFunction<VV, EV, M> gatherFunction,
SumFunction<VV, EV, M> sumFunction,
ApplyFunction<K, VV, M> applyFunction,
int maxIterations,
GSAConfiguration parameters)Gathers information from each neighbor edge.
public abstract class GatherFunction<VV, EV, M> extends AbstractRichFunction {
public abstract M gather(Neighbor<VV, EV> neighbor) throws Exception;
// Utility methods
public long getNumberOfVertices()
public int getSuperstepNumber()
public <T> Aggregator<T> getIterationAggregator(String name)
public void preSuperstep() throws Exception
public void postSuperstep() throws Exception
}Aggregates all gathered values for a vertex.
public abstract class SumFunction<VV, EV, M> extends AbstractRichFunction {
public abstract M sum(M value1, M value2) throws Exception;
}Applies the aggregated result to update the vertex value.
public abstract class ApplyFunction<K, VV, M> extends AbstractRichFunction {
public abstract VV apply(VV currentValue, M sum) throws Exception;
// Utility methods
public long getNumberOfVertices()
public int getSuperstepNumber()
public <T> Aggregator<T> getIterationAggregator(String name)
public void preSuperstep() throws Exception
public void postSuperstep() throws Exception
}Represents a neighbor vertex and connecting edge in GSA iterations.
public class Neighbor<VV, EV> {
public VV getNeighborValue()
public EV getEdgeValue()
}Configuration options for GSA iterations.
public class GSAConfiguration {
public GSAConfiguration setName(String name)
public GSAConfiguration setParallelism(int parallelism)
public GSAConfiguration setSolutionSetUnmanagedMemory(boolean unmanaged)
public GSAConfiguration registerAggregator(String name, Aggregator<?> aggregator)
public GSAConfiguration setOptDegrees(boolean optDegrees)
}Usage Example:
// Connected Components using GSA
public class CCGather extends GatherFunction<Long, NullValue, Long> {
@Override
public Long gather(Neighbor<Long, NullValue> neighbor) {
return neighbor.getNeighborValue();
}
}
public class CCSum extends SumFunction<Long, NullValue, Long> {
@Override
public Long sum(Long value1, Long value2) {
return Math.min(value1, value2);
}
}
public class CCApply extends ApplyFunction<Long, Long, Long> {
@Override
public Long apply(Long currentValue, Long sum) {
return Math.min(currentValue, sum);
}
}
// Run Connected Components
Graph<Long, Long, NullValue> result = graph.runGatherSumApplyIteration(
new CCGather(),
new CCSum(),
new CCApply(),
maxIterations
);All iteration models support convergence detection through aggregators:
// In compute/scatter/gather/apply functions
LongSumAggregator changedVertices = getIterationAggregator("changed");
if (valueChanged) {
changedVertices.aggregate(1L);
}
// Check convergence in configuration
configuration.registerAggregator("changed", new LongSumAggregator());setOptDegrees(true) for degree-based algorithmsAll user-defined functions can throw exceptions that will be propagated and cause job failure:
@Override
public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages, Collector<Double> out)
throws Exception {
if (vertex.getValue() < 0) {
throw new IllegalArgumentException("Negative vertex value: " + vertex.getValue());
}
// ... computation logic
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gelly-2-10