Gelly: Flink Graph API - A comprehensive graph processing library for Apache Flink
—
Core functionality for creating graphs from various data sources and transforming graph structure and data. Gelly provides flexible graph construction methods and comprehensive transformation operations for both structure and data modification.
Create graphs from in-memory Java collections of vertices and edges.
public static <K, VV, EV> Graph<K, VV, EV> fromCollection(
Collection<Vertex<K, VV>> vertices,
Collection<Edge<K, EV>> edges,
ExecutionEnvironment context)
public static <K, EV> Graph<K, NullValue, EV> fromCollection(
Collection<Edge<K, EV>> edges,
ExecutionEnvironment context)
public static <K, VV, EV> Graph<K, VV, EV> fromCollection(
Collection<Edge<K, EV>> edges,
MapFunction<K, VV> vertexValueInitializer,
ExecutionEnvironment context)Usage Example:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Create vertices
List<Vertex<Long, String>> vertices = Arrays.asList(
new Vertex<>(1L, "Alice"),
new Vertex<>(2L, "Bob"),
new Vertex<>(3L, "Charlie")
);
// Create edges
List<Edge<Long, Double>> edges = Arrays.asList(
new Edge<>(1L, 2L, 0.8),
new Edge<>(2L, 3L, 0.6),
new Edge<>(3L, 1L, 0.9)
);
// Create graph from collections
Graph<Long, String, Double> graph = Graph.fromCollection(vertices, edges, env);
// Create graph from edges only (vertices auto-generated with NullValue)
Graph<Long, NullValue, Double> simpleGraph = Graph.fromCollection(edges, env);
// Create graph with vertex initializer
Graph<Long, Long, Double> idGraph = Graph.fromCollection(
edges,
id -> id * 100L, // Initialize vertex value as ID * 100
env
);Create graphs from Flink DataSets for distributed graph processing.
public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(
DataSet<Vertex<K, VV>> vertices,
DataSet<Edge<K, EV>> edges,
ExecutionEnvironment context)
public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
DataSet<Edge<K, EV>> edges,
ExecutionEnvironment context)
public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(
DataSet<Edge<K, EV>> edges,
MapFunction<K, VV> vertexValueInitializer,
ExecutionEnvironment context)Create graphs from tuple DataSets using standard Flink tuple types.
public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(
DataSet<Tuple2<K, VV>> vertices,
DataSet<Tuple3<K, K, EV>> edges,
ExecutionEnvironment context)
public static <K, EV> Graph<K, NullValue, EV> fromTupleDataSet(
DataSet<Tuple3<K, K, EV>> edges,
ExecutionEnvironment context)
public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(
DataSet<Tuple3<K, K, EV>> edges,
MapFunction<K, VV> vertexValueInitializer,
ExecutionEnvironment context)Usage Example:
// Create vertex tuples (ID, Value)
DataSet<Tuple2<Long, String>> vertexTuples = env.fromElements(
new Tuple2<>(1L, "Alice"),
new Tuple2<>(2L, "Bob"),
new Tuple2<>(3L, "Charlie")
);
// Create edge tuples (Source, Target, Value)
DataSet<Tuple3<Long, Long, Double>> edgeTuples = env.fromElements(
new Tuple3<>(1L, 2L, 0.8),
new Tuple3<>(2L, 3L, 0.6),
new Tuple3<>(3L, 1L, 0.9)
);
// Create graph from tuples
Graph<Long, String, Double> graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env);Create graphs from CSV files using Flink's CSV reader capabilities.
public static GraphCsvReader fromCsvReader(String filePath, ExecutionEnvironment context)
// GraphCsvReader methods
public GraphCsvReader vertices(String filePath)
public GraphCsvReader edges(String filePath)
public <K, VV, EV> Graph<K, VV, EV> types(Class<K> keyClass, Class<VV> vertexValueClass, Class<EV> edgeValueClass)Transform vertex values while preserving graph structure.
public <NV> Graph<K, NV, EV> mapVertices(MapFunction<Vertex<K, VV>, NV> mapper)Usage Example:
// Transform vertex values to uppercase
Graph<Long, String, Double> upperGraph = graph.mapVertices(
vertex -> vertex.getValue().toUpperCase()
);
// Convert vertex values to their length
Graph<Long, Integer, Double> lengthGraph = graph.mapVertices(
vertex -> vertex.getValue().length()
);Transform edge values while preserving graph structure.
public <NE> Graph<K, VV, NE> mapEdges(MapFunction<Edge<K, EV>, NE> mapper)Usage Example:
// Square all edge weights
Graph<Long, String, Double> squaredGraph = graph.mapEdges(
edge -> edge.getValue() * edge.getValue()
);
// Convert edge weights to strings
Graph<Long, String, String> stringGraph = graph.mapEdges(
edge -> "weight:" + edge.getValue()
);Translate graph IDs and values using the ASM translation framework.
public <NK> Graph<NK, VV, EV> translateGraphIds(TranslateFunction<K, NK> translator)
public <NVV> Graph<K, NVV, EV> translateVertexValues(TranslateFunction<VV, NVV> translator)
public <NEV> Graph<K, VV, NEV> translateEdgeValues(TranslateFunction<EV, NEV> translator)Usage Example:
// Translate Long IDs to String IDs
Graph<String, String, Double> stringIdGraph = graph.translateGraphIds(
new LongValueToStringValue()
);
// Add offset to all vertex IDs
Graph<Long, String, Double> offsetGraph = graph.translateGraphIds(
new LongValueAddOffset(1000L)
);Filter vertices and edges based on predicates.
public Graph<K, VV, EV> filterOnVertices(FilterFunction<Vertex<K, VV>> vertexFilter)
public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter)
public Graph<K, VV, EV> subgraph(
FilterFunction<Vertex<K, VV>> vertexFilter,
FilterFunction<Edge<K, EV>> edgeFilter)Usage Example:
// Filter vertices by value
Graph<Long, String, Double> filteredVertices = graph.filterOnVertices(
vertex -> vertex.getValue().length() > 3
);
// Filter edges by weight
Graph<Long, String, Double> filteredEdges = graph.filterOnEdges(
edge -> edge.getValue() > 0.7
);
// Create subgraph with both filters
Graph<Long, String, Double> subgraph = graph.subgraph(
vertex -> vertex.getValue().startsWith("A"),
edge -> edge.getValue() > 0.5
);Perform set operations between graphs.
public Graph<K, VV, EV> union(Graph<K, VV, EV> other)
public Graph<K, VV, EV> difference(Graph<K, VV, EV> other)
public Graph<K, VV, EV> intersect(Graph<K, VV, EV> other)Usage Example:
Graph<Long, String, Double> graph1 = /* ... */;
Graph<Long, String, Double> graph2 = /* ... */;
// Union of two graphs
Graph<Long, String, Double> unionGraph = graph1.union(graph2);
// Difference (edges in graph1 but not in graph2)
Graph<Long, String, Double> diffGraph = graph1.difference(graph2);
// Intersection (common edges)
Graph<Long, String, Double> intersectGraph = graph1.intersect(graph2);Modify graph structure and directionality.
public Graph<K, VV, EV> reverse()
public Graph<K, VV, EV> getUndirected()Usage Example:
// Reverse all edge directions
Graph<Long, String, Double> reversedGraph = graph.reverse();
// Convert to undirected graph (adds reverse edges)
Graph<Long, String, Double> undirectedGraph = graph.getUndirected();Represents a graph vertex with ID and value.
public class Vertex<K, V> extends Tuple2<K, V> {
public Vertex() {}
public Vertex(K id, V value) {}
public K getId() { return f0; }
public V getValue() { return f1; }
public void setId(K id) { f0 = id; }
public void setValue(V value) { f1 = value; }
}Represents a graph edge with source, target, and value.
public class Edge<K, V> extends Tuple3<K, K, V> {
public Edge() {}
public Edge(K source, K target, V value) {}
public K getSource() { return f0; }
public K getTarget() { return f1; }
public V getValue() { return f2; }
public void setSource(K source) { f0 = source; }
public void setTarget(K target) { f1 = target; }
public void setValue(V value) { f2 = value; }
public Edge<K, V> reverse() { return new Edge<>(f1, f0, f2); }
}Represents an edge with its source and target vertex values.
public class Triplet<K, VV, EV> extends Tuple5<K, K, VV, VV, EV> {
public Vertex<K, VV> getSrcVertex() { return new Vertex<>(f0, f2); }
public Vertex<K, VV> getTrgVertex() { return new Vertex<>(f1, f3); }
public Edge<K, EV> getEdge() { return new Edge<>(f0, f1, f4); }
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gelly-2-10