CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-gelly-2-10

Gelly: Flink Graph API - A comprehensive graph processing library for Apache Flink

Pending
Overview
Eval results
Files

data-access.mddocs/

Data Access and Utilities

Gelly provides comprehensive methods for accessing graph data, computing neighborhoods, performing joins, and integrating with Flink's DataSet API. This includes graph validation, format conversion, and utility functions for common graph operations.

Capabilities

Basic Data Access

Access the fundamental graph components as Flink DataSets.

// Core data access
public DataSet<Vertex<K, VV>> getVertices()
public DataSet<Edge<K, EV>> getEdges()
public DataSet<Triplet<K, VV, EV>> getTriplets()

// ID extraction
public DataSet<K> getVertexIds()
public DataSet<Tuple2<K, K>> getEdgeIds()

Usage Example:

Graph<Long, String, Double> graph = /* ... */;

// Access vertices and edges
DataSet<Vertex<Long, String>> vertices = graph.getVertices();
DataSet<Edge<Long, Double>> edges = graph.getEdges();

// Process vertices
DataSet<String> vertexValues = vertices.map(vertex -> vertex.getValue());
vertexValues.print();

// Process edges
DataSet<Double> edgeWeights = edges.map(edge -> edge.getValue());
System.out.println("Average weight: " + edgeWeights.reduce((a, b) -> (a + b) / 2));

// Get triplets (edges with vertex values)
DataSet<Triplet<Long, String, Double>> triplets = graph.getTriplets();
triplets.print(); // Shows: (srcId, trgId, srcValue, trgValue, edgeValue)

Neighborhood Operations

Perform computations on vertex neighborhoods with various aggregation patterns.

Edge-Based Neighborhood Operations

Operate on edges incident to each vertex.

public <T> DataSet<T> groupReduceOnEdges(
    EdgesFunction<K, EV, T> edgesFunction, 
    EdgeDirection direction)

public <T> DataSet<T> groupReduceOnEdges(
    EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction, 
    EdgeDirection direction)

public DataSet<EV> reduceOnEdges(
    ReduceEdgesFunction<EV> reduceEdgesFunction, 
    EdgeDirection direction)

Neighbor-Based Operations

Operate on neighboring vertices.

public <T> DataSet<T> groupReduceOnNeighbors(
    NeighborsFunction<K, VV, EV, T> neighborsFunction, 
    EdgeDirection direction)

public <T> DataSet<T> groupReduceOnNeighbors(
    NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction, 
    EdgeDirection direction)

public DataSet<VV> reduceOnNeighbors(
    ReduceNeighborsFunction<VV> reduceNeighborsFunction, 
    EdgeDirection direction)

Usage Example:

// Count outgoing edges for each vertex
DataSet<Tuple2<Long, Integer>> outDegrees = graph.groupReduceOnEdges(
    new EdgesFunction<Long, Double, Tuple2<Long, Integer>>() {
        @Override
        public void iterateEdges(
            Iterable<Tuple2<Long, Edge<Long, Double>>> edges,
            Collector<Tuple2<Long, Integer>> out) {
            
            int count = 0;
            Long vertexId = null;
            
            for (Tuple2<Long, Edge<Long, Double>> edge : edges) {
                vertexId = edge.f0; // Vertex ID
                count++;
            }
            
            out.collect(new Tuple2<>(vertexId, count));
        }
    },
    EdgeDirection.OUT
);

// Sum neighbor values
DataSet<Tuple2<Long, String>> neighborSums = graph.groupReduceOnNeighbors(
    new NeighborsFunction<Long, String, Double, Tuple2<Long, String>>() {
        @Override
        public void iterateNeighbors(
            Iterable<Tuple3<Long, Vertex<Long, String>, Edge<Long, Double>>> neighbors,
            Collector<Tuple2<Long, String>> out) {
            
            StringBuilder sum = new StringBuilder();
            Long vertexId = null;
            
            for (Tuple3<Long, Vertex<Long, String>, Edge<Long, Double>> neighbor : neighbors) {
                vertexId = neighbor.f0;
                sum.append(neighbor.f1.getValue()).append(" ");
            }
            
            out.collect(new Tuple2<>(vertexId, sum.toString().trim()));
        }
    },
    EdgeDirection.ALL
);

Function Interfaces

Core interfaces for neighborhood operations.

EdgesFunction<K, EV, O>

Process edges incident to each vertex.

public interface EdgesFunction<K, EV, O> extends Function, Serializable {
    void iterateEdges(
        Iterable<Tuple2<K, Edge<K, EV>>> edges, 
        Collector<O> out) throws Exception;
}

EdgesFunctionWithVertexValue<K, VV, EV, T>

Process edges with access to the vertex value.

public interface EdgesFunctionWithVertexValue<K, VV, EV, T> extends Function, Serializable {
    void iterateEdges(
        Vertex<K, VV> vertex,
        Iterable<Edge<K, EV>> edges,
        Collector<T> out) throws Exception;
}

NeighborsFunction<K, VV, EV, T>

Process neighboring vertices and connecting edges.

public interface NeighborsFunction<K, VV, EV, T> extends Function, Serializable {
    void iterateNeighbors(
        Iterable<Tuple3<K, Vertex<K, VV>, Edge<K, EV>>> neighbors,
        Collector<T> out) throws Exception;
}

NeighborsFunctionWithVertexValue<K, VV, EV, T>

Process neighbors with access to the source vertex value.

public interface NeighborsFunctionWithVertexValue<K, VV, EV, T> extends Function, Serializable {
    void iterateNeighbors(
        Vertex<K, VV> vertex,
        Iterable<Tuple2<Vertex<K, VV>, Edge<K, EV>>> neighbors,
        Collector<T> out) throws Exception;
}

Reduce Functions

Simple reduction operations on edges and neighbors.

public interface ReduceEdgesFunction<EV> extends Function, Serializable {
    EV reduceEdges(EV firstEdgeValue, EV secondEdgeValue) throws Exception;
}

public interface ReduceNeighborsFunction<VV> extends Function, Serializable {
    VV reduceNeighbors(VV firstNeighborValue, VV secondNeighborValue) throws Exception;
}

Usage Example:

// Reduce edge weights to find maximum outgoing edge weight per vertex
DataSet<Tuple2<Long, Double>> maxWeights = graph.reduceOnEdges(
    new ReduceEdgesFunction<Double>() {
        @Override
        public Double reduceEdges(Double first, Double second) {
            return Math.max(first, second);
        }
    },
    EdgeDirection.OUT
);

// Reduce neighbor values (concatenate strings)
DataSet<Tuple2<Long, String>> concatenatedNeighbors = graph.reduceOnNeighbors(
    new ReduceNeighborsFunction<String>() {
        @Override
        public String reduceNeighbors(String first, String second) {
            return first + "_" + second;
        }
    },
    EdgeDirection.ALL
);

Graph Joins

Join graph components with external DataSets.

Vertex Joins

Join vertices with external data.

public <T> Graph<K, VV, EV> joinWithVertices(
    DataSet<Tuple2<K, T>> inputDataSet, 
    VertexJoinFunction<VV, T> vertexJoinFunction)

public <T> Graph<K, T, EV> joinWithVertices(
    DataSet<Tuple2<K, T>> inputDataSet, 
    VertexJoinFunction<VV, T> vertexJoinFunction,
    boolean keepUnmatchedVertices)

Edge Joins

Join edges with external data.

public <T> Graph<K, VV, EV> joinWithEdges(
    DataSet<Tuple3<K, K, T>> inputDataSet,
    EdgeJoinFunction<EV, T> edgeJoinFunction)

public <T> Graph<K, VV, T> joinWithEdges(
    DataSet<Tuple3<K, K, T>> inputDataSet,
    EdgeJoinFunction<EV, T> edgeJoinFunction,
    boolean keepUnmatchedEdges)

Join Function Interfaces

public interface VertexJoinFunction<VV, T> extends Function, Serializable {
    VV vertexJoin(VV vertexValue, T inputValue) throws Exception;
}

public interface EdgeJoinFunction<EV, T> extends Function, Serializable {
    EV edgeJoin(EV edgeValue, T inputValue) throws Exception;
}

Usage Example:

// External vertex data
DataSet<Tuple2<Long, Integer>> vertexAges = env.fromElements(
    new Tuple2<>(1L, 25),
    new Tuple2<>(2L, 30),
    new Tuple2<>(3L, 35)
);

// Join with vertices to add age information
Graph<Long, String, Double> enrichedGraph = graph.joinWithVertices(
    vertexAges,
    new VertexJoinFunction<String, Integer>() {
        @Override
        public String vertexJoin(String name, Integer age) {
            return name + "_age" + age;
        }
    }
);

// External edge data
DataSet<Tuple3<Long, Long, String>> edgeLabels = env.fromElements(
    new Tuple3<>(1L, 2L, "friendship"),
    new Tuple3<>(2L, 3L, "colleague")
);

// Join with edges to add labels
Graph<Long, String, String> labeledGraph = enrichedGraph.joinWithEdges(
    edgeLabels,
    new EdgeJoinFunction<Double, String>() {
        @Override
        public String edgeJoin(Double weight, String label) {
            return label + "_" + weight;
        }
    }
);

Graph Validation

Validate graph structure and properties.

public Graph<K, VV, EV> validate(GraphValidator<K, VV, EV> validator) throws Exception

public interface GraphValidator<K, VV, EV> extends Serializable {
    boolean validate(Graph<K, VV, EV> graph) throws Exception;
}

Usage Example:

// Custom validator to check for self-loops
GraphValidator<Long, String, Double> noSelfLoopValidator = 
    new GraphValidator<Long, String, Double>() {
        @Override
        public boolean validate(Graph<Long, String, Double> graph) throws Exception {
            DataSet<Edge<Long, Double>> selfLoops = graph.getEdges()
                .filter(edge -> edge.getSource().equals(edge.getTarget()));
            
            long selfLoopCount = selfLoops.count();
            return selfLoopCount == 0;
        }
    };

// Validate graph
Graph<Long, String, Double> validatedGraph = graph.validate(noSelfLoopValidator);

Utility Classes

Conversion utilities for different data formats.

Tuple Conversion Utilities

// Convert between tuples and graph types
public class Tuple2ToVertexMap<K, VV> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>>
public class Tuple3ToEdgeMap<K, EV> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>>
public class VertexToTuple2Map<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>>
public class EdgeToTuple3Map<K, EV> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>>

Usage Example:

// Convert graph to tuple format for export
DataSet<Tuple2<Long, String>> vertexTuples = graph.getVertices()
    .map(new VertexToTuple2Map<Long, String>());

DataSet<Tuple3<Long, Long, Double>> edgeTuples = graph.getEdges()
    .map(new EdgeToTuple3Map<Long, Double>());

// Export to CSV
vertexTuples.writeAsCsv("vertices.csv");
edgeTuples.writeAsCsv("edges.csv");

// Import from tuples
DataSet<Vertex<Long, String>> importedVertices = vertexTuples
    .map(new Tuple2ToVertexMap<Long, String>());

DataSet<Edge<Long, Double>> importedEdges = edgeTuples
    .map(new Tuple3ToEdgeMap<Long, Double>());

Graph<Long, String, Double> importedGraph = Graph.fromDataSet(
    importedVertices, importedEdges, env);

Integration with Flink DataSet API

Seamless integration with Flink's DataSet operations.

Usage Example:

// Use Flink operations on graph components
DataSet<Vertex<Long, String>> vertices = graph.getVertices();

// Standard DataSet operations
DataSet<Vertex<Long, String>> filteredVertices = vertices
    .filter(vertex -> vertex.getValue().length() > 3)
    .map(vertex -> new Vertex<>(vertex.getId(), vertex.getValue().toUpperCase()));

// Group operations
DataSet<Tuple2<String, Long>> vertexGroups = vertices
    .map(vertex -> new Tuple2<>(vertex.getValue().substring(0, 1), 1L))
    .groupBy(0)
    .sum(1);

// Join with other DataSets
DataSet<Tuple2<Long, Integer>> externalData = /* ... */;
DataSet<Tuple3<Long, String, Integer>> joined = vertices
    .join(externalData)
    .where("f0").equalTo("f0")
    .with((vertex, data) -> new Tuple3<>(vertex.getId(), vertex.getValue(), data.f1));

// Create new graph from processed data
Graph<Long, String, Double> processedGraph = Graph.fromDataSet(
    filteredVertices, graph.getEdges(), env);

Performance Optimization

Caching Graph Components

// Cache frequently accessed components
DataSet<Vertex<Long, String>> vertices = graph.getVertices().cache();
DataSet<Edge<Long, Double>> edges = graph.getEdges().cache();

// Reuse cached data
DataSet<String> values1 = vertices.map(v -> v.getValue());
DataSet<String> values2 = vertices.map(v -> v.getValue().toUpperCase());

Efficient Data Types

// Use value types for better performance
Graph<LongValue, StringValue, DoubleValue> efficientGraph = /* ... */;

// Avoid boxed primitives for large-scale operations
Graph<Long, String, Double> lessEfficientGraph = /* ... */;

Parallel Processing

// Configure parallelism for data access operations
env.setParallelism(8);

// Operations will use configured parallelism
DataSet<Vertex<Long, String>> vertices = graph.getVertices(); // Uses parallelism 8

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-gelly-2-10

docs

algorithms.md

analytics.md

data-access.md

generators.md

graph-creation.md

index.md

iterative-processing.md

tile.json