Gelly: Flink Graph API - A comprehensive graph processing library for Apache Flink
—
Gelly provides comprehensive methods for accessing graph data, computing neighborhoods, performing joins, and integrating with Flink's DataSet API. This includes graph validation, format conversion, and utility functions for common graph operations.
Access the fundamental graph components as Flink DataSets.
// Core data access
public DataSet<Vertex<K, VV>> getVertices()
public DataSet<Edge<K, EV>> getEdges()
public DataSet<Triplet<K, VV, EV>> getTriplets()
// ID extraction
public DataSet<K> getVertexIds()
public DataSet<Tuple2<K, K>> getEdgeIds()Usage Example:
Graph<Long, String, Double> graph = /* ... */;
// Access vertices and edges
DataSet<Vertex<Long, String>> vertices = graph.getVertices();
DataSet<Edge<Long, Double>> edges = graph.getEdges();
// Process vertices
DataSet<String> vertexValues = vertices.map(vertex -> vertex.getValue());
vertexValues.print();
// Process edges
DataSet<Double> edgeWeights = edges.map(edge -> edge.getValue());
System.out.println("Average weight: " + edgeWeights.reduce((a, b) -> (a + b) / 2));
// Get triplets (edges with vertex values)
DataSet<Triplet<Long, String, Double>> triplets = graph.getTriplets();
triplets.print(); // Shows: (srcId, trgId, srcValue, trgValue, edgeValue)Perform computations on vertex neighborhoods with various aggregation patterns.
Operate on edges incident to each vertex.
public <T> DataSet<T> groupReduceOnEdges(
EdgesFunction<K, EV, T> edgesFunction,
EdgeDirection direction)
public <T> DataSet<T> groupReduceOnEdges(
EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
EdgeDirection direction)
public DataSet<EV> reduceOnEdges(
ReduceEdgesFunction<EV> reduceEdgesFunction,
EdgeDirection direction)Operate on neighboring vertices.
public <T> DataSet<T> groupReduceOnNeighbors(
NeighborsFunction<K, VV, EV, T> neighborsFunction,
EdgeDirection direction)
public <T> DataSet<T> groupReduceOnNeighbors(
NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
EdgeDirection direction)
public DataSet<VV> reduceOnNeighbors(
ReduceNeighborsFunction<VV> reduceNeighborsFunction,
EdgeDirection direction)Usage Example:
// Count outgoing edges for each vertex
DataSet<Tuple2<Long, Integer>> outDegrees = graph.groupReduceOnEdges(
new EdgesFunction<Long, Double, Tuple2<Long, Integer>>() {
@Override
public void iterateEdges(
Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
Collector<Tuple2<Long, Integer>> out) {
int count = 0;
Long vertexId = null;
for (Tuple2<Long, Edge<Long, Double>> edge : edges) {
vertexId = edge.f0; // Vertex ID
count++;
}
out.collect(new Tuple2<>(vertexId, count));
}
},
EdgeDirection.OUT
);
// Sum neighbor values
DataSet<Tuple2<Long, String>> neighborSums = graph.groupReduceOnNeighbors(
new NeighborsFunction<Long, String, Double, Tuple2<Long, String>>() {
@Override
public void iterateNeighbors(
Iterable<Tuple3<Long, Vertex<Long, String>, Edge<Long, Double>>> neighbors,
Collector<Tuple2<Long, String>> out) {
StringBuilder sum = new StringBuilder();
Long vertexId = null;
for (Tuple3<Long, Vertex<Long, String>, Edge<Long, Double>> neighbor : neighbors) {
vertexId = neighbor.f0;
sum.append(neighbor.f1.getValue()).append(" ");
}
out.collect(new Tuple2<>(vertexId, sum.toString().trim()));
}
},
EdgeDirection.ALL
);Core interfaces for neighborhood operations.
Process edges incident to each vertex.
public interface EdgesFunction<K, EV, O> extends Function, Serializable {
void iterateEdges(
Iterable<Tuple2<K, Edge<K, EV>>> edges,
Collector<O> out) throws Exception;
}Process edges with access to the vertex value.
public interface EdgesFunctionWithVertexValue<K, VV, EV, T> extends Function, Serializable {
void iterateEdges(
Vertex<K, VV> vertex,
Iterable<Edge<K, EV>> edges,
Collector<T> out) throws Exception;
}Process neighboring vertices and connecting edges.
public interface NeighborsFunction<K, VV, EV, T> extends Function, Serializable {
void iterateNeighbors(
Iterable<Tuple3<K, Vertex<K, VV>, Edge<K, EV>>> neighbors,
Collector<T> out) throws Exception;
}Process neighbors with access to the source vertex value.
public interface NeighborsFunctionWithVertexValue<K, VV, EV, T> extends Function, Serializable {
void iterateNeighbors(
Vertex<K, VV> vertex,
Iterable<Tuple2<Vertex<K, VV>, Edge<K, EV>>> neighbors,
Collector<T> out) throws Exception;
}Simple reduction operations on edges and neighbors.
public interface ReduceEdgesFunction<EV> extends Function, Serializable {
EV reduceEdges(EV firstEdgeValue, EV secondEdgeValue) throws Exception;
}
public interface ReduceNeighborsFunction<VV> extends Function, Serializable {
VV reduceNeighbors(VV firstNeighborValue, VV secondNeighborValue) throws Exception;
}Usage Example:
// Reduce edge weights to find maximum outgoing edge weight per vertex
DataSet<Tuple2<Long, Double>> maxWeights = graph.reduceOnEdges(
new ReduceEdgesFunction<Double>() {
@Override
public Double reduceEdges(Double first, Double second) {
return Math.max(first, second);
}
},
EdgeDirection.OUT
);
// Reduce neighbor values (concatenate strings)
DataSet<Tuple2<Long, String>> concatenatedNeighbors = graph.reduceOnNeighbors(
new ReduceNeighborsFunction<String>() {
@Override
public String reduceNeighbors(String first, String second) {
return first + "_" + second;
}
},
EdgeDirection.ALL
);Join graph components with external DataSets.
Join vertices with external data.
public <T> Graph<K, VV, EV> joinWithVertices(
DataSet<Tuple2<K, T>> inputDataSet,
VertexJoinFunction<VV, T> vertexJoinFunction)
public <T> Graph<K, T, EV> joinWithVertices(
DataSet<Tuple2<K, T>> inputDataSet,
VertexJoinFunction<VV, T> vertexJoinFunction,
boolean keepUnmatchedVertices)Join edges with external data.
public <T> Graph<K, VV, EV> joinWithEdges(
DataSet<Tuple3<K, K, T>> inputDataSet,
EdgeJoinFunction<EV, T> edgeJoinFunction)
public <T> Graph<K, VV, T> joinWithEdges(
DataSet<Tuple3<K, K, T>> inputDataSet,
EdgeJoinFunction<EV, T> edgeJoinFunction,
boolean keepUnmatchedEdges)public interface VertexJoinFunction<VV, T> extends Function, Serializable {
VV vertexJoin(VV vertexValue, T inputValue) throws Exception;
}
public interface EdgeJoinFunction<EV, T> extends Function, Serializable {
EV edgeJoin(EV edgeValue, T inputValue) throws Exception;
}Usage Example:
// External vertex data
DataSet<Tuple2<Long, Integer>> vertexAges = env.fromElements(
new Tuple2<>(1L, 25),
new Tuple2<>(2L, 30),
new Tuple2<>(3L, 35)
);
// Join with vertices to add age information
Graph<Long, String, Double> enrichedGraph = graph.joinWithVertices(
vertexAges,
new VertexJoinFunction<String, Integer>() {
@Override
public String vertexJoin(String name, Integer age) {
return name + "_age" + age;
}
}
);
// External edge data
DataSet<Tuple3<Long, Long, String>> edgeLabels = env.fromElements(
new Tuple3<>(1L, 2L, "friendship"),
new Tuple3<>(2L, 3L, "colleague")
);
// Join with edges to add labels
Graph<Long, String, String> labeledGraph = enrichedGraph.joinWithEdges(
edgeLabels,
new EdgeJoinFunction<Double, String>() {
@Override
public String edgeJoin(Double weight, String label) {
return label + "_" + weight;
}
}
);Validate graph structure and properties.
public Graph<K, VV, EV> validate(GraphValidator<K, VV, EV> validator) throws Exception
public interface GraphValidator<K, VV, EV> extends Serializable {
boolean validate(Graph<K, VV, EV> graph) throws Exception;
}Usage Example:
// Custom validator to check for self-loops
GraphValidator<Long, String, Double> noSelfLoopValidator =
new GraphValidator<Long, String, Double>() {
@Override
public boolean validate(Graph<Long, String, Double> graph) throws Exception {
DataSet<Edge<Long, Double>> selfLoops = graph.getEdges()
.filter(edge -> edge.getSource().equals(edge.getTarget()));
long selfLoopCount = selfLoops.count();
return selfLoopCount == 0;
}
};
// Validate graph
Graph<Long, String, Double> validatedGraph = graph.validate(noSelfLoopValidator);Conversion utilities for different data formats.
// Convert between tuples and graph types
public class Tuple2ToVertexMap<K, VV> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>>
public class Tuple3ToEdgeMap<K, EV> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>>
public class VertexToTuple2Map<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>>
public class EdgeToTuple3Map<K, EV> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>>Usage Example:
// Convert graph to tuple format for export
DataSet<Tuple2<Long, String>> vertexTuples = graph.getVertices()
.map(new VertexToTuple2Map<Long, String>());
DataSet<Tuple3<Long, Long, Double>> edgeTuples = graph.getEdges()
.map(new EdgeToTuple3Map<Long, Double>());
// Export to CSV
vertexTuples.writeAsCsv("vertices.csv");
edgeTuples.writeAsCsv("edges.csv");
// Import from tuples
DataSet<Vertex<Long, String>> importedVertices = vertexTuples
.map(new Tuple2ToVertexMap<Long, String>());
DataSet<Edge<Long, Double>> importedEdges = edgeTuples
.map(new Tuple3ToEdgeMap<Long, Double>());
Graph<Long, String, Double> importedGraph = Graph.fromDataSet(
importedVertices, importedEdges, env);Seamless integration with Flink's DataSet operations.
Usage Example:
// Use Flink operations on graph components
DataSet<Vertex<Long, String>> vertices = graph.getVertices();
// Standard DataSet operations
DataSet<Vertex<Long, String>> filteredVertices = vertices
.filter(vertex -> vertex.getValue().length() > 3)
.map(vertex -> new Vertex<>(vertex.getId(), vertex.getValue().toUpperCase()));
// Group operations
DataSet<Tuple2<String, Long>> vertexGroups = vertices
.map(vertex -> new Tuple2<>(vertex.getValue().substring(0, 1), 1L))
.groupBy(0)
.sum(1);
// Join with other DataSets
DataSet<Tuple2<Long, Integer>> externalData = /* ... */;
DataSet<Tuple3<Long, String, Integer>> joined = vertices
.join(externalData)
.where("f0").equalTo("f0")
.with((vertex, data) -> new Tuple3<>(vertex.getId(), vertex.getValue(), data.f1));
// Create new graph from processed data
Graph<Long, String, Double> processedGraph = Graph.fromDataSet(
filteredVertices, graph.getEdges(), env);// Cache frequently accessed components
DataSet<Vertex<Long, String>> vertices = graph.getVertices().cache();
DataSet<Edge<Long, Double>> edges = graph.getEdges().cache();
// Reuse cached data
DataSet<String> values1 = vertices.map(v -> v.getValue());
DataSet<String> values2 = vertices.map(v -> v.getValue().toUpperCase());// Use value types for better performance
Graph<LongValue, StringValue, DoubleValue> efficientGraph = /* ... */;
// Avoid boxed primitives for large-scale operations
Graph<Long, String, Double> lessEfficientGraph = /* ... */;// Configure parallelism for data access operations
env.setParallelism(8);
// Operations will use configured parallelism
DataSet<Vertex<Long, String>> vertices = graph.getVertices(); // Uses parallelism 8Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gelly-2-10