CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-gelly-2-10

Gelly: Flink Graph API - A comprehensive graph processing library for Apache Flink

Pending
Overview
Eval results
Files

analytics.mddocs/

Graph Analytics and Metrics

Gelly's Analytics framework (ASM - Analytics, Statistics, Metrics) provides comprehensive tools for computing graph statistics, degree distributions, and structural metrics. The framework includes both graph-level analytics and general DataSet analytics with efficient accumulator-based result collection.

Capabilities

Analytics Execution Interface

Analytics implement either GraphAnalytic for graph-specific analysis or DataSetAnalytic for general dataset analysis.

public interface GraphAnalytic<K, VV, EV, T> {
    T getResult();
    T execute() throws Exception;
    T execute(String executionName) throws Exception;
    GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input) throws Exception;
}

public interface DataSetAnalytic<T, R> {
    R getResult();
    R execute() throws Exception;
    R execute(String executionName) throws Exception;
    DataSetAnalytic<T, R> run(DataSet<T> input);
}

// Execute analytics on graph
public <T> T run(GraphAnalytic<K, VV, EV, T> analytic) throws Exception

Basic Graph Statistics

Core graph statistics available as direct methods on the Graph class.

// Vertex and edge counts
public long numberOfVertices() throws Exception
public long numberOfEdges() throws Exception

// Degree computations
public DataSet<Tuple2<K, LongValue>> inDegrees()
public DataSet<Tuple2<K, LongValue>> outDegrees()
public DataSet<Tuple2<K, LongValue>> getDegrees()

Usage Example:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, String, Double> graph = /* ... */;

// Basic statistics
long vertexCount = graph.numberOfVertices();
long edgeCount = graph.numberOfEdges();

System.out.println("Graph has " + vertexCount + " vertices and " + edgeCount + " edges");

// Degree analysis
DataSet<Tuple2<Long, LongValue>> degrees = graph.getDegrees();
degrees.print(); // Print all vertex degrees

// Find vertices with high degree
DataSet<Tuple2<Long, LongValue>> highDegree = degrees
    .filter(degree -> degree.f1.getValue() > 5);

DataSet Analytics

General-purpose analytics for any DataSet, including graph components.

Count Analytics

Count elements in a DataSet with optional execution naming.

public class Count<T> extends AbstractDataSetAnalytic<T, LongValue> {
    public Count()
}

Usage Example:

// Count vertices
Count<Vertex<Long, String>> vertexCount = new Count<>();
LongValue count = graph.getVertices().run(vertexCount).execute();

// Count edges
Count<Edge<Long, Double>> edgeCount = new Count<>();
LongValue edgeCountResult = graph.getEdges().run(edgeCount).execute();

Collect Analytics

Collect DataSet elements to the driver program.

public class Collect<T> extends AbstractDataSetAnalytic<T, List<T>> {
    public Collect()
}

Usage Example:

// Collect small datasets
Collect<Vertex<Long, String>> collector = new Collect<>();
List<Vertex<Long, String>> vertices = graph.getVertices().run(collector).execute();

for (Vertex<Long, String> vertex : vertices) {
    System.out.println("Vertex: " + vertex.getId() + " = " + vertex.getValue());
}

Checksum Analytics

Compute checksums for data validation and testing.

public class ChecksumHashCode<T> extends AbstractDataSetAnalytic<T, Tuple3<IntValue, LongValue, LongValue>> {
    public ChecksumHashCode()
}

Usage Example:

// Compute checksum of vertices
ChecksumHashCode<Vertex<Long, String>> checksum = new ChecksumHashCode<>();
Tuple3<IntValue, LongValue, LongValue> result = graph.getVertices().run(checksum).execute();

// Result contains: (hashCode, count, checksum)
System.out.println("Hash: " + result.f0.getValue());
System.out.println("Count: " + result.f1.getValue()); 
System.out.println("Checksum: " + result.f2.getValue());

Degree-Based Analytics

Comprehensive degree analysis for both directed and undirected graphs.

Directed Graph Degree Analytics

Analytics for directed graphs with separate in-degree and out-degree analysis.

// Vertex degree annotation
public class VertexDegrees<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Vertex<K, Tuple3<K, LongValue, LongValue>>>>

public class VertexInDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Vertex<K, Tuple2<K, LongValue>>>>

public class VertexOutDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Vertex<K, Tuple2<K, LongValue>>>>

// Edge degree annotation  
public class EdgeDegreesPair<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple4<K, LongValue, LongValue, EV>>>>

public class EdgeSourceDegrees<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple3<K, LongValue, LongValue, EV>>>>

public class EdgeTargetDegrees<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple3<K, LongValue, LongValue, EV>>>>

Usage Example:

// Annotate vertices with in-degree and out-degree
VertexDegrees<Long, String, Double> vertexDegrees = new VertexDegrees<>();
DataSet<Vertex<Long, Tuple3<Long, LongValue, LongValue>>> degreeResult = graph.run(vertexDegrees);

// Result format: Vertex<ID, (originalID, inDegree, outDegree)>
degreeResult.print();

// Annotate edges with source and target degrees
EdgeDegreesPair<Long, String, Double> edgeDegrees = new EdgeDegreesPair<>();
DataSet<Edge<Long, Tuple4<Long, LongValue, LongValue, Double>>> edgeResult = graph.run(edgeDegrees);

// Result format: Edge<source, target, (targetID, sourceInDegree, sourceOutDegree, originalEdgeValue)>
edgeResult.print();

Undirected Graph Degree Analytics

Analytics for undirected graphs with single degree values.

// Vertex degree annotation
public class VertexDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Vertex<K, Tuple2<K, LongValue>>>>

// Edge degree annotation
public class EdgeDegreePair<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple3<K, LongValue, LongValue>>>>

public class EdgeSourceDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple2<K, LongValue>>>>

public class EdgeTargetDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, DataSet<Edge<K, Tuple2<K, LongValue>>>>

Usage Example:

// For undirected graph analysis
Graph<Long, String, Double> undirectedGraph = graph.getUndirected();

// Annotate vertices with degree
VertexDegree<Long, String, Double> vertexDegree = new VertexDegree<>();
DataSet<Vertex<Long, Tuple2<Long, LongValue>>> result = undirectedGraph.run(vertexDegree);

// Result format: Vertex<ID, (originalID, degree)>
result.print();

// Find high-degree vertices
DataSet<Vertex<Long, Tuple2<Long, LongValue>>> highDegreeVertices = result
    .filter(vertex -> vertex.getValue().f1.getValue() > 10);

Degree Filtering

Filter graphs based on degree constraints.

// Filter by maximum degree (undirected graphs)
public class MaximumDegree<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, Graph<K, VV, EV>> {
    public MaximumDegree(long maximumDegree)
}

Usage Example:

// Filter graph to only include vertices with degree <= 5
MaximumDegree<Long, String, Double> filter = new MaximumDegree<>(5L);
Graph<Long, String, Double> filteredGraph = graph.run(filter);

System.out.println("Original vertices: " + graph.numberOfVertices());
System.out.println("Filtered vertices: " + filteredGraph.numberOfVertices());

Graph Metrics

Comprehensive metrics for graph structure analysis.

Directed Graph Metrics

public class EdgeMetrics<K, VV, EV> implements GraphAnalytic<K, VV, EV, EdgeMetrics.Result> {
    public EdgeMetrics()
}

public class VertexMetrics<K, VV, EV> implements GraphAnalytic<K, VV, EV, VertexMetrics.Result> {
    public VertexMetrics()
}

Undirected Graph Metrics

public class EdgeMetrics<K, VV, EV> implements GraphAnalytic<K, VV, EV, EdgeMetrics.Result> {
    public EdgeMetrics()
}

public class VertexMetrics<K, VV, EV> implements GraphAnalytic<K, VV, EV, VertexMetrics.Result> {
    public VertexMetrics()
}

Usage Example:

// Compute comprehensive vertex metrics
VertexMetrics<Long, String, Double> vertexMetrics = 
    new org.apache.flink.graph.library.metric.undirected.VertexMetrics<>();
VertexMetrics.Result metrics = graph.run(vertexMetrics);

// Access various metrics
System.out.println("Vertex count: " + metrics.getNumberOfVertices());
System.out.println("Edge count: " + metrics.getNumberOfEdges());
System.out.println("Average degree: " + metrics.getAverageDegree());
System.out.println("Density: " + metrics.getDensity());
System.out.println("Triplet count: " + metrics.getNumberOfTriplets());

// Compute edge metrics
EdgeMetrics<Long, String, Double> edgeMetrics = 
    new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<>();
EdgeMetrics.Result edgeResult = graph.run(edgeMetrics);

Result Types

Common result types used by analytics.

// Unary result (single value)
public class UnaryResult<T> implements PrintableResult {
    public T getValue()
}

// Binary result (two values)  
public class BinaryResult<T0, T1> implements PrintableResult {
    public T0 getValue0()
    public T1 getValue1()
}

// Tertiary result (three values)
public class TertiaryResult<T0, T1, T2> implements PrintableResult {
    public T0 getValue0()
    public T1 getValue1() 
    public T2 getValue2()
}

// Printable result interface
public interface PrintableResult {
    String toPrintableString()
}

Custom Analytics Development

Creating Graph Analytics

public class CustomGraphAnalytic<K, VV, EV> extends AbstractGraphAnalytic<K, VV, EV, MyResult> {
    
    @Override
    public MyResult execute() throws Exception {
        // Implementation using graph data
        DataSet<Vertex<K, VV>> vertices = input.getVertices();
        DataSet<Edge<K, EV>> edges = input.getEdges();
        
        // Compute custom analysis
        MyResult result = computeCustomMetric(vertices, edges);
        return result;
    }
    
    private MyResult computeCustomMetric(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges) {
        // Custom computation logic
        return new MyResult();
    }
}

Creating DataSet Analytics

public class CustomDataSetAnalytic<T> extends AbstractDataSetAnalytic<T, MyResult> {
    
    @Override
    public MyResult execute() throws Exception {
        // Process the input DataSet
        return input.aggregate(/* custom aggregation */).collect().get(0);
    }
}

Performance Optimization

Memory Management

// Configure memory for analytics
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setTaskManagerMemory(2048); // 2GB

// Use efficient data types
Graph<LongValue, NullValue, NullValue> efficientGraph = /* ... */;

Parallel Execution

// Set parallelism for analytics
env.setParallelism(4);

// Analytics will use the configured parallelism
VertexMetrics<Long, String, Double> metrics = new VertexMetrics<>();
metrics.setParallelism(8); // Override for specific analytic

Accumulator-Based Results

Analytics use Flink's accumulator mechanism for efficient result collection:

// Results are automatically collected using accumulators
// No explicit collect() calls needed for analytics
MyResult result = graph.run(myAnalytic).execute();

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-gelly-2-10

docs

algorithms.md

analytics.md

data-access.md

generators.md

graph-creation.md

index.md

iterative-processing.md

tile.json