CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-gelly-2-10

Gelly: Flink Graph API - A comprehensive graph processing library for Apache Flink

Pending
Overview
Eval results
Files

iterative-processing.mddocs/

Iterative Processing Models

Gelly provides three distributed iterative computation models for implementing graph algorithms: Vertex-centric (Pregel), Scatter-Gather, and Gather-Sum-Apply. Each model provides different abstractions for message-passing graph computations on distributed Flink clusters.

Capabilities

Vertex-Centric (Pregel) Model

The vertex-centric model follows Google's Pregel programming paradigm where computation is performed at each vertex by processing incoming messages and sending messages to neighbors.

public <M> Graph<K, VV, EV> runVertexCentricIteration(
    ComputeFunction<K, VV, EV, M> computeFunction,
    MessageCombiner<K, M> combiner,
    int maxIterations)

public <M> Graph<K, VV, EV> runVertexCentricIteration(
    ComputeFunction<K, VV, EV, M> computeFunction,
    MessageCombiner<K, M> combiner,
    int maxIterations,
    VertexCentricConfiguration parameters)

ComputeFunction<K, VV, EV, M>

Defines the computation performed at each vertex in each iteration.

public abstract class ComputeFunction<K, VV, EV, M> implements Serializable {
    public abstract void compute(
        Vertex<K, VV> vertex,
        MessageIterator<M> messages) throws Exception;
        
    // Message sending methods
    public void sendMessageToAllNeighbors(M message)
    public void sendMessageTo(K target, M message)
    public Iterable<Edge<K, EV>> getEdges()
    
    // Vertex value update
    public void setNewVertexValue(VV newValue)
    
    // Utility methods
    public long getNumberOfVertices()
    public int getSuperstepNumber()
    public <T> Aggregator<T> getIterationAggregator(String name)
    public void preSuperstep() throws Exception
    public void postSuperstep() throws Exception
}

MessageCombiner<K, M>

Optional combiner to reduce message traffic by combining messages sent to the same vertex.

public interface MessageCombiner<K, M> extends java.io.Serializable {
    void combineMessages(MessageIterator<M> messages, Collector<M> out) throws Exception;
}

MessageIterator<M>

Iterator for processing incoming messages at a vertex.

public interface MessageIterator<M> extends Iterator<M> {
    boolean hasNext();
    M next();
}

VertexCentricConfiguration

Configuration options for vertex-centric iterations.

public class VertexCentricConfiguration {
    public VertexCentricConfiguration setName(String name)
    public VertexCentricConfiguration setParallelism(int parallelism)
    public VertexCentricConfiguration setSolutionSetUnmanagedMemory(boolean unmanaged)
    public VertexCentricConfiguration registerAggregator(String name, Aggregator<?> aggregator)
}

Usage Example:

// Single Source Shortest Path using Vertex-Centric iteration
public class SSSpComputeFunction extends ComputeFunction<Long, Double, Double, Double> {
    @Override
    public void compute(Vertex<Long, Double> vertex, 
                       MessageIterator<Double> messages) throws Exception {
        
        double minDistance = (vertex.getId().equals(1L)) ? 0.0 : Double.POSITIVE_INFINITY;
        
        // Update distance with minimum from incoming messages
        while (messages.hasNext()) {
            minDistance = Math.min(minDistance, messages.next());
        }
        
        // If distance changed, propagate to neighbors
        if (minDistance < vertex.getValue()) {
            setNewVertexValue(minDistance);
            // Send updated distance + edge weight to all neighbors
            for (Edge<Long, Double> edge : getEdges()) {
                sendMessageTo(edge.getTarget(), minDistance + edge.getValue());
            }
        }
    }
}

// Run the algorithm
Graph<Long, Double, Double> result = graph.runVertexCentricIteration(
    new SSSpComputeFunction(),
    new MinMessageCombiner(),
    maxIterations
);

Scatter-Gather Model

The scatter-gather model separates message sending (scatter) and vertex update (gather) into distinct phases.

public <M> Graph<K, VV, EV> runScatterGatherIteration(
    ScatterFunction<K, VV, M, EV> scatterFunction,
    GatherFunction<K, VV, M> gatherFunction,
    int maxIterations)

public <M> Graph<K, VV, EV> runScatterGatherIteration(
    ScatterFunction<K, VV, M, EV> scatterFunction,
    GatherFunction<K, VV, M> gatherFunction,
    int maxIterations,
    ScatterGatherConfiguration parameters)

ScatterFunction<K, VV, M, EV>

Defines how messages are sent to neighbors in the scatter phase.

public abstract class ScatterFunction<K, VV, M, EV> extends AbstractRichFunction {
    public abstract void sendMessages(Vertex<K, VV> vertex, Collector<Tuple2<K, M>> out) throws Exception;
    
    // Access to outgoing edges
    public Iterable<Edge<K, EV>> getEdges()
    
    // Utility methods
    public long getNumberOfVertices()
    public int getSuperstepNumber()
    public <T> Aggregator<T> getIterationAggregator(String name)
    public void preSuperstep() throws Exception
    public void postSuperstep() throws Exception
}

GatherFunction<K, VV, M>

Defines how vertex values are updated based on incoming messages in the gather phase.

public abstract class GatherFunction<K, VV, M> extends AbstractRichFunction {
    public abstract VV updateVertex(Vertex<K, VV> vertex, MessageIterator<M> inMessages) throws Exception;
    
    // Utility methods
    public long getNumberOfVertices()
    public int getSuperstepNumber()
    public <T> Aggregator<T> getIterationAggregator(String name)
    public void preSuperstep() throws Exception
    public void postSuperstep() throws Exception
}

ScatterGatherConfiguration

Configuration options for scatter-gather iterations.

public class ScatterGatherConfiguration {
    public ScatterGatherConfiguration setName(String name)
    public ScatterGatherConfiguration setParallelism(int parallelism)
    public ScatterGatherConfiguration setSolutionSetUnmanagedMemory(boolean unmanaged)
    public ScatterGatherConfiguration registerAggregator(String name, Aggregator<?> aggregator)
    public ScatterGatherConfiguration setOptDegrees(boolean optDegrees)
}

Usage Example:

// PageRank using Scatter-Gather
public class PageRankScatter extends ScatterFunction<Long, Double, Double, NullValue> {
    @Override
    public void sendMessages(Vertex<Long, Double> vertex, Collector<Tuple2<Long, Double>> out) {
        int degree = 0;
        for (Edge<Long, NullValue> edge : getEdges()) {
            degree++;
        }
        
        double rankToSend = vertex.getValue() / degree;
        for (Edge<Long, NullValue> edge : getEdges()) {
            out.collect(new Tuple2<>(edge.getTarget(), rankToSend));
        }
    }
}

public class PageRankGather extends GatherFunction<Long, Double, Double> {
    private final double dampingFactor = 0.85;
    
    @Override
    public Double updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {
        double sum = 0.0;
        while (inMessages.hasNext()) {
            sum += inMessages.next();
        }
        return (1.0 - dampingFactor) / getNumberOfVertices() + dampingFactor * sum;
    }
}

// Run PageRank
Graph<Long, Double, NullValue> result = graph.runScatterGatherIteration(
    new PageRankScatter(),
    new PageRankGather(),
    maxIterations
);

Gather-Sum-Apply (GSA) Model

The GSA model provides three distinct phases: gather information from neighbors, sum/aggregate the gathered information, and apply the result to update vertex values.

public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
    GatherFunction<VV, EV, M> gatherFunction,
    SumFunction<VV, EV, M> sumFunction,
    ApplyFunction<K, VV, M> applyFunction,
    int maxIterations)

public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
    GatherFunction<VV, EV, M> gatherFunction,
    SumFunction<VV, EV, M> sumFunction,
    ApplyFunction<K, VV, M> applyFunction,
    int maxIterations,
    GSAConfiguration parameters)

GatherFunction<VV, EV, M>

Gathers information from each neighbor edge.

public abstract class GatherFunction<VV, EV, M> extends AbstractRichFunction {
    public abstract M gather(Neighbor<VV, EV> neighbor) throws Exception;
    
    // Utility methods
    public long getNumberOfVertices()
    public int getSuperstepNumber()
    public <T> Aggregator<T> getIterationAggregator(String name)
    public void preSuperstep() throws Exception
    public void postSuperstep() throws Exception
}

SumFunction<VV, EV, M>

Aggregates all gathered values for a vertex.

public abstract class SumFunction<VV, EV, M> extends AbstractRichFunction {
    public abstract M sum(M value1, M value2) throws Exception;
}

ApplyFunction<K, VV, M>

Applies the aggregated result to update the vertex value.

public abstract class ApplyFunction<K, VV, M> extends AbstractRichFunction {
    public abstract VV apply(VV currentValue, M sum) throws Exception;
    
    // Utility methods
    public long getNumberOfVertices()
    public int getSuperstepNumber()
    public <T> Aggregator<T> getIterationAggregator(String name)
    public void preSuperstep() throws Exception
    public void postSuperstep() throws Exception
}

Neighbor<VV, EV>

Represents a neighbor vertex and connecting edge in GSA iterations.

public class Neighbor<VV, EV> {
    public VV getNeighborValue()
    public EV getEdgeValue()
}

GSAConfiguration

Configuration options for GSA iterations.

public class GSAConfiguration {
    public GSAConfiguration setName(String name)
    public GSAConfiguration setParallelism(int parallelism)
    public GSAConfiguration setSolutionSetUnmanagedMemory(boolean unmanaged)
    public GSAConfiguration registerAggregator(String name, Aggregator<?> aggregator)
    public GSAConfiguration setOptDegrees(boolean optDegrees)
}

Usage Example:

// Connected Components using GSA
public class CCGather extends GatherFunction<Long, NullValue, Long> {
    @Override
    public Long gather(Neighbor<Long, NullValue> neighbor) {
        return neighbor.getNeighborValue();
    }
}

public class CCSum extends SumFunction<Long, NullValue, Long> {
    @Override
    public Long sum(Long value1, Long value2) {
        return Math.min(value1, value2);
    }
}

public class CCApply extends ApplyFunction<Long, Long, Long> {
    @Override
    public Long apply(Long currentValue, Long sum) {
        return Math.min(currentValue, sum);
    }
}

// Run Connected Components
Graph<Long, Long, NullValue> result = graph.runGatherSumApplyIteration(
    new CCGather(),
    new CCSum(),
    new CCApply(),
    maxIterations
);

Common Patterns

Convergence Detection

All iteration models support convergence detection through aggregators:

// In compute/scatter/gather/apply functions
LongSumAggregator changedVertices = getIterationAggregator("changed");
if (valueChanged) {
    changedVertices.aggregate(1L);
}

// Check convergence in configuration
configuration.registerAggregator("changed", new LongSumAggregator());

Performance Optimization

  • Message Combiners: Reduce network traffic in vertex-centric model
  • Degree Optimization: Enable setOptDegrees(true) for degree-based algorithms
  • Memory Management: Configure solution set memory for large graphs
  • Parallelism: Set appropriate parallelism for iteration phases

Error Handling

All user-defined functions can throw exceptions that will be propagated and cause job failure:

@Override
public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages, Collector<Double> out) 
    throws Exception {
    
    if (vertex.getValue() < 0) {
        throw new IllegalArgumentException("Negative vertex value: " + vertex.getValue());
    }
    // ... computation logic
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-gelly-2-10

docs

algorithms.md

analytics.md

data-access.md

generators.md

graph-creation.md

index.md

iterative-processing.md

tile.json