Complete API for graph analytics, metrics calculations, and structural operations.
def inDegrees(): DataSet[(K, LongValue)]Returns the in-degree of all vertices in the graph as a DataSet of (vertexId, inDegree) tuples.
def outDegrees(): DataSet[(K, LongValue)]Returns the out-degree of all vertices in the graph as a DataSet of (vertexId, outDegree) tuples.
def getDegrees(): DataSet[(K, LongValue)]Returns the total degree (in-degree + out-degree) of all vertices as a DataSet of (vertexId, degree) tuples.
def getUndirected(): Graph[K, VV, EV]Creates an undirected version of the graph by adding all inverse-direction edges. Each edge (u,v) results in both (u,v) and (v,u) edges.
def reverse(): Graph[K, VV, EV]Reverses the direction of all edges in the graph. Edge (u,v) becomes (v,u).
def validate(validator: GraphValidator[K, VV, EV]): BooleanValidates the graph using the provided validator function.
Parameters:
validator - GraphValidator that defines validation rulesdef union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]Performs union on the vertices and edges sets of the input graphs. Removes duplicate vertices but maintains duplicate edges.
Parameters:
graph - The graph to perform union withdef difference(graph: Graph[K, VV, EV]): Graph[K, VV, EV]Performs difference on the vertex and edge sets. Removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed.
Parameters:
graph - The graph to perform difference withdef intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV]Performs intersection on the edge sets of the input graphs. Edges are considered equal if they have the same source identifier, target identifier, and edge value.
Parameters:
graph - The graph to perform intersection withdistinctEdges - If true, exactly one edge represents all pairs of equal edges; if false, both edges of each pair are includeddef groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T], direction: EdgeDirection): DataSet[T]Computes an aggregate over the edges of each vertex without access to the vertex value.
Parameters:
edgesFunction - Function to apply to the edges of each vertexdirection - Edge direction (IN, OUT, ALL)def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]Computes an aggregate over the edges of each vertex with access to the vertex value.
Parameters:
edgesFunction - Function that has access to both vertex value and edgesdirection - Edge direction (IN, OUT, ALL)def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunction[K, VV, EV, T], direction: EdgeDirection): DataSet[T]Computes an aggregate over the neighbors (edges and vertices) of each vertex.
Parameters:
neighborsFunction - Function to apply to the neighborhooddirection - Edge direction (IN, OUT, ALL)def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]Computes an aggregate over the neighbors with access to the source vertex value.
Parameters:
neighborsFunction - Function that has access to source vertex and neighborsdirection - Edge direction (IN, OUT, ALL)def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction: EdgeDirection): DataSet[(K, VV)]Computes a reduce transformation over the neighbors' vertex values of each vertex. The function consecutively combines pairs of neighbor vertex values until only a single value remains.
Parameters:
reduceNeighborsFunction - Reduce function to apply to neighbor valuesdirection - Edge direction (IN, OUT, ALL)Returns: DataSet of (vertexId, aggregatedValue) tuples
def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): DataSet[(K, EV)]Computes a reduce transformation over the edge values of each vertex. The function consecutively combines pairs of edge values until only a single value remains.
Parameters:
reduceEdgesFunction - Reduce function to apply to edge valuesdirection - Edge direction (IN, OUT, ALL)Returns: DataSet of (vertexId, aggregatedValue) tuples
abstract class EdgesFunction[K, EV, T] {
def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T]): Unit
}Abstract base class for functions that operate on the edges of a vertex.
abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] {
def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T]): Unit
}Abstract base class for functions that operate on edges with access to the vertex value.
abstract class NeighborsFunction[K, VV, EV, T] {
def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])], out: Collector[T]): Unit
}Abstract base class for functions that operate on vertex neighbors (edges and adjacent vertices).
abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] {
def iterateNeighbors(vertex: Vertex[K, VV], neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])], out: Collector[T]): Unit
}Abstract base class for functions that operate on neighbors with access to the source vertex value.
object EdgeDirection extends Enumeration {
val IN, OUT, ALL = Value
}Enumeration for specifying edge directions in neighborhood operations:
IN - Consider only incoming edgesOUT - Consider only outgoing edgesALL - Consider both incoming and outgoing edgesimport org.apache.flink.graph.EdgeDirection
// Calculate all degree metrics
val inDegrees = graph.inDegrees()
val outDegrees = graph.outDegrees()
val totalDegrees = graph.getDegrees()
// Find vertices with high out-degree
val highOutDegree = outDegrees.filter(_._2.getValue > 10)// Create undirected version
val undirectedGraph = graph.getUndirected()
// Reverse all edges
val reversedGraph = graph.reverse()
// Combine with another graph
val combinedGraph = graph.union(otherGraph)
// Find intersection with another graph
val intersection = graph.intersect(otherGraph, distinctEdges = true)// Custom edge aggregation function
class SumEdgeValues extends EdgesFunction[Long, Double, Double] {
override def iterateEdges(edges: Iterable[(Long, Edge[Long, Double])], out: Collector[Double]): Unit = {
val sum = edges.map(_._2.getValue).sum
out.collect(sum)
}
}
// Apply edge aggregation
val edgeSums = graph.groupReduceOnEdges(new SumEdgeValues(), EdgeDirection.OUT)
// Reduce neighbor values
val neighborSums = graph.reduceOnNeighbors(
new ReduceNeighborsFunction[String] {
override def reduceNeighbors(firstNeighborValue: String, secondNeighborValue: String): String = {
firstNeighborValue + "," + secondNeighborValue
}
},
EdgeDirection.ALL
)// Calculate average edge weight per vertex
class AverageEdgeWeight extends EdgesFunctionWithVertexValue[Long, String, Double, (Long, Double)] {
override def iterateEdges(vertex: Vertex[Long, String], edges: Iterable[Edge[Long, Double]], out: Collector[(Long, Double)]): Unit = {
val edgeList = edges.toList
if (edgeList.nonEmpty) {
val average = edgeList.map(_.getValue).sum / edgeList.size
out.collect((vertex.getId, average))
}
}
}
val avgWeights = graph.groupReduceOnEdges(new AverageEdgeWeight(), EdgeDirection.OUT)