Gelly: Flink Graph API - A comprehensive graph processing library for Apache Flink
—
Gelly's Analytics framework (ASM - Analytics, Statistics, Metrics) provides comprehensive tools for computing graph statistics, degree distributions, and structural metrics. The framework includes both graph-level analytics and general DataSet analytics with efficient accumulator-based result collection.
Analytics implement either GraphAnalytic for graph-specific analysis or DataSetAnalytic for general dataset analysis.
public interface GraphAnalytic<K, VV, EV, T> {
T getResult();
T execute() throws Exception;
T execute(String executionName) throws Exception;
GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input) throws Exception;
}
public interface DataSetAnalytic<T, R> {
R getResult();
R execute() throws Exception;
R execute(String executionName) throws Exception;
DataSetAnalytic<T, R> run(DataSet<T> input);
}
// Execute analytics on graph
public <T> T run(GraphAnalytic<K, VV, EV, T> analytic) throws ExceptionCore graph statistics available as direct methods on the Graph class.
// Vertex and edge counts
public long numberOfVertices() throws Exception
public long numberOfEdges() throws Exception
// Degree computations
public DataSet<Tuple2<K, LongValue>> inDegrees()
public DataSet<Tuple2<K, LongValue>> outDegrees()
public DataSet<Tuple2<K, LongValue>> getDegrees()Usage Example:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, String, Double> graph = /* ... */;
// Basic statistics
long vertexCount = graph.numberOfVertices();
long edgeCount = graph.numberOfEdges();
System.out.println("Graph has " + vertexCount + " vertices and " + edgeCount + " edges");
// Degree analysis
DataSet<Tuple2<Long, LongValue>> degrees = graph.getDegrees();
degrees.print(); // Print all vertex degrees
// Find vertices with high degree
DataSet<Tuple2<Long, LongValue>> highDegree = degrees
.filter(degree -> degree.f1.getValue() > 5);General-purpose analytics for any DataSet, including graph components.
Count elements in a DataSet with optional execution naming.
public class Count<T> extends AbstractDataSetAnalytic<T, LongValue> {
public Count()
}Usage Example:
// Count vertices
Count<Vertex<Long, String>> vertexCount = new Count<>();
LongValue count = graph.getVertices().run(vertexCount).execute();
// Count edges
Count<Edge<Long, Double>> edgeCount = new Count<>();
LongValue edgeCountResult = graph.getEdges().run(edgeCount).execute();Collect DataSet elements to the driver program.
public class Collect<T> extends AbstractDataSetAnalytic<T, List<T>> {
public Collect()
}Usage Example:
// Collect small datasets
Collect<Vertex<Long, String>> collector = new Collect<>();
List<Vertex<Long, String>> vertices = graph.getVertices().run(collector).execute();
for (Vertex<Long, String> vertex : vertices) {
System.out.println("Vertex: " + vertex.getId() + " = " + vertex.getValue());
}Compute checksums for data validation and testing.
public class ChecksumHashCode<T> extends AbstractDataSetAnalytic<T, Tuple3<IntValue, LongValue, LongValue>> {
public ChecksumHashCode()
}Usage Example:
// Compute checksum of vertices
ChecksumHashCode<Vertex<Long, String>> checksum = new ChecksumHashCode<>();
Tuple3<IntValue, LongValue, LongValue> result = graph.getVertices().run(checksum).execute();
// Result contains: (hashCode, count, checksum)
System.out.println("Hash: " + result.f0.getValue());
System.out.println("Count: " + result.f1.getValue());
System.out.println("Checksum: " + result.f2.getValue());Comprehensive degree analysis for both directed and undirected graphs.
Analytics for directed graphs with separate in-degree and out-degree analysis.
// Vertex degree annotation
public class VertexDegrees<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Vertex<K, Tuple3<K, LongValue, LongValue>>>>
public class VertexInDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Vertex<K, Tuple2<K, LongValue>>>>
public class VertexOutDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Vertex<K, Tuple2<K, LongValue>>>>
// Edge degree annotation
public class EdgeDegreesPair<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple4<K, LongValue, LongValue, EV>>>>
public class EdgeSourceDegrees<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple3<K, LongValue, LongValue, EV>>>>
public class EdgeTargetDegrees<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple3<K, LongValue, LongValue, EV>>>>Usage Example:
// Annotate vertices with in-degree and out-degree
VertexDegrees<Long, String, Double> vertexDegrees = new VertexDegrees<>();
DataSet<Vertex<Long, Tuple3<Long, LongValue, LongValue>>> degreeResult = graph.run(vertexDegrees);
// Result format: Vertex<ID, (originalID, inDegree, outDegree)>
degreeResult.print();
// Annotate edges with source and target degrees
EdgeDegreesPair<Long, String, Double> edgeDegrees = new EdgeDegreesPair<>();
DataSet<Edge<Long, Tuple4<Long, LongValue, LongValue, Double>>> edgeResult = graph.run(edgeDegrees);
// Result format: Edge<source, target, (targetID, sourceInDegree, sourceOutDegree, originalEdgeValue)>
edgeResult.print();Analytics for undirected graphs with single degree values.
// Vertex degree annotation
public class VertexDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Vertex<K, Tuple2<K, LongValue>>>>
// Edge degree annotation
public class EdgeDegreePair<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple3<K, LongValue, LongValue>>>>
public class EdgeSourceDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple2<K, LongValue>>>>
public class EdgeTargetDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple2<K, LongValue>>>>Usage Example:
// For undirected graph analysis
Graph<Long, String, Double> undirectedGraph = graph.getUndirected();
// Annotate vertices with degree
VertexDegree<Long, String, Double> vertexDegree = new VertexDegree<>();
DataSet<Vertex<Long, Tuple2<Long, LongValue>>> result = undirectedGraph.run(vertexDegree);
// Result format: Vertex<ID, (originalID, degree)>
result.print();
// Find high-degree vertices
DataSet<Vertex<Long, Tuple2<Long, LongValue>>> highDegreeVertices = result
.filter(vertex -> vertex.getValue().f1.getValue() > 10);Filter graphs based on degree constraints.
// Filter by maximum degree (undirected graphs)
public class MaximumDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, Graph<K, VV, EV>> {
public MaximumDegree(long maximumDegree)
}Usage Example:
// Filter graph to only include vertices with degree <= 5
MaximumDegree<Long, String, Double> filter = new MaximumDegree<>(5L);
Graph<Long, String, Double> filteredGraph = graph.run(filter);
System.out.println("Original vertices: " + graph.numberOfVertices());
System.out.println("Filtered vertices: " + filteredGraph.numberOfVertices());Comprehensive metrics for graph structure analysis.
public class EdgeMetrics<K, VV, EV> implements GraphAnalytic<K, VV, EV, EdgeMetrics.Result> {
public EdgeMetrics()
}
public class VertexMetrics<K, VV, EV> implements GraphAnalytic<K, VV, EV, VertexMetrics.Result> {
public VertexMetrics()
}public class EdgeMetrics<K, VV, EV> implements GraphAnalytic<K, VV, EV, EdgeMetrics.Result> {
public EdgeMetrics()
}
public class VertexMetrics<K, VV, EV> implements GraphAnalytic<K, VV, EV, VertexMetrics.Result> {
public VertexMetrics()
}Usage Example:
// Compute comprehensive vertex metrics
VertexMetrics<Long, String, Double> vertexMetrics =
new org.apache.flink.graph.library.metric.undirected.VertexMetrics<>();
VertexMetrics.Result metrics = graph.run(vertexMetrics);
// Access various metrics
System.out.println("Vertex count: " + metrics.getNumberOfVertices());
System.out.println("Edge count: " + metrics.getNumberOfEdges());
System.out.println("Average degree: " + metrics.getAverageDegree());
System.out.println("Density: " + metrics.getDensity());
System.out.println("Triplet count: " + metrics.getNumberOfTriplets());
// Compute edge metrics
EdgeMetrics<Long, String, Double> edgeMetrics =
new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<>();
EdgeMetrics.Result edgeResult = graph.run(edgeMetrics);Common result types used by analytics.
// Unary result (single value)
public class UnaryResult<T> implements PrintableResult {
public T getValue()
}
// Binary result (two values)
public class BinaryResult<T0, T1> implements PrintableResult {
public T0 getValue0()
public T1 getValue1()
}
// Tertiary result (three values)
public class TertiaryResult<T0, T1, T2> implements PrintableResult {
public T0 getValue0()
public T1 getValue1()
public T2 getValue2()
}
// Printable result interface
public interface PrintableResult {
String toPrintableString()
}public class CustomGraphAnalytic<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, MyResult> {
@Override
public MyResult execute() throws Exception {
// Implementation using graph data
DataSet<Vertex<K, VV>> vertices = input.getVertices();
DataSet<Edge<K, EV>> edges = input.getEdges();
// Compute custom analysis
MyResult result = computeCustomMetric(vertices, edges);
return result;
}
private MyResult computeCustomMetric(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges) {
// Custom computation logic
return new MyResult();
}
}public class CustomDataSetAnalytic<T> extends AbstractDataSetAnalytic<T, MyResult> {
@Override
public MyResult execute() throws Exception {
// Process the input DataSet
return input.aggregate(/* custom aggregation */).collect().get(0);
}
}// Configure memory for analytics
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setTaskManagerMemory(2048); // 2GB
// Use efficient data types
Graph<LongValue, NullValue, NullValue> efficientGraph = /* ... */;// Set parallelism for analytics
env.setParallelism(4);
// Analytics will use the configured parallelism
VertexMetrics<Long, String, Double> metrics = new VertexMetrics<>();
metrics.setParallelism(8); // Override for specific analyticAnalytics use Flink's accumulator mechanism for efficient result collection:
// Results are automatically collected using accumulators
// No explicit collect() calls needed for analytics
MyResult result = graph.run(myAnalytic).execute();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gelly-2-10