Built-in graph metrics, degree calculations, neighborhood operations, and custom reduction functions for comprehensive graph analysis and statistical computation.
Compute vertex degrees for analyzing graph connectivity patterns.
/**
* Return the in-degree of all vertices in the graph
* @return A DataSet of Tuple2<vertexId, inDegree>
*/
def inDegrees(): DataSet[(K, LongValue)]
/**
* Return the out-degree of all vertices in the graph
* @return A DataSet of Tuple2<vertexId, outDegree>
*/
def outDegrees(): DataSet[(K, LongValue)]
/**
* Return the degree of all vertices in the graph
* @return A DataSet of Tuple2<vertexId, degree>
*/
def getDegrees(): DataSet[(K, LongValue)]Perform reduction operations over vertex neighborhoods for custom analytics.
/**
* Compute a reduce transformation over the neighbors' vertex values of each vertex.
* For each vertex, the transformation consecutively calls a
* ReduceNeighborsFunction until only a single value for each vertex remains.
* The ReduceNeighborsFunction combines a pair of neighbor vertex values
* into one new value of the same type.
* @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex.
* @param direction the edge direction (in-, out-, all-)
* @return a Dataset of Tuple2, with one tuple per vertex.
* The first field of the Tuple2 is the vertex ID and the second field
* is the aggregate value computed by the provided ReduceNeighborsFunction.
*/
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV],
direction: EdgeDirection): DataSet[(K, VV)]
/**
* Compute a reduce transformation over the neighbors' vertex values of each vertex.
* For each vertex, the transformation consecutively calls a
* ReduceNeighborsFunction until only a single value for each vertex remains.
* The ReduceNeighborsFunction combines a pair of neighbor vertex values
* into one new value of the same type.
* @param reduceEdgesFunction the reduce function to apply to the edges of each vertex.
* @param direction the edge direction (in-, out-, all-)
* @return a Dataset of Tuple2, with one tuple per vertex.
* The first field of the Tuple2 is the vertex ID and the second field
* is the aggregate value computed by the provided ReduceNeighborsFunction.
*/
def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV],
direction: EdgeDirection): DataSet[(K, EV)]Perform more complex aggregations over neighborhoods using user-defined functions.
/**
* Compute an aggregate over the edges of each vertex. The function applied
* on the edges has access to the vertex value.
* @param edgesFunction the function to apply to the neighborhood
* @param direction the edge direction (in-, out-, all-)
* @tparam T the output type
* @return a dataset of a T
*/
def groupReduceOnEdges[T](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T],
direction: EdgeDirection): DataSet[T]
/**
* Compute an aggregate over the edges of each vertex. The function applied
* on the edges has access to the vertex value.
* @param edgesFunction the function to apply to the neighborhood
* @param direction the edge direction (in-, out-, all-)
* @tparam T the output type
* @return a dataset of a T
*/
def groupReduceOnEdges[T](edgesFunction: EdgesFunction[K, EV, T],
direction: EdgeDirection): DataSet[T]
/**
* Compute an aggregate over the neighbors (edges and vertices) of each
* vertex. The function applied on the neighbors has access to the vertex
* value.
* @param neighborsFunction the function to apply to the neighborhood
* @param direction the edge direction (in-, out-, all-)
* @tparam T the output type
* @return a dataset of a T
*/
def groupReduceOnNeighbors[T](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T],
direction: EdgeDirection): DataSet[T]
/**
* Compute an aggregate over the neighbors (edges and vertices) of each
* vertex.
* @param neighborsFunction the function to apply to the neighborhood
* @param direction the edge direction (in-, out-, all-)
* @tparam T the output type
* @return a dataset of a T
*/
def groupReduceOnNeighbors[T](neighborsFunction: NeighborsFunction[K, VV, EV, T],
direction: EdgeDirection): DataSet[T]Key function interfaces for implementing custom neighborhood reductions:
// From Java Gelly - Reduce functions for simple aggregations
trait ReduceNeighborsFunction[VV] {
def reduceNeighbors(firstNeighborValue: VV, secondNeighborValue: VV): VV
}
trait ReduceEdgesFunction[EV] {
def reduceEdges(firstEdgeValue: EV, secondEdgeValue: EV): EV
}Control which edges to consider in neighborhood operations:
// From Java Gelly
object EdgeDirection extends Enumeration {
val IN: EdgeDirection // Consider only incoming edges
val OUT: EdgeDirection // Consider only outgoing edges
val ALL: EdgeDirection // Consider both incoming and outgoing edges
}Usage Examples:
import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex, EdgeDirection}
import org.apache.flink.api.scala._
import org.apache.flink.types.LongValue
val env = ExecutionEnvironment.getExecutionEnvironment
// Create sample graph with numeric vertex values
val vertices = env.fromCollection(Seq(
new Vertex(1L, 10.0),
new Vertex(2L, 20.0),
new Vertex(3L, 30.0),
new Vertex(4L, 40.0)
))
val edges = env.fromCollection(Seq(
new Edge(1L, 2L, 1.5),
new Edge(2L, 3L, 2.5),
new Edge(3L, 4L, 3.5),
new Edge(1L, 4L, 4.5)
))
val graph = Graph.fromDataSet(vertices, edges, env)
// Basic degree calculations
val inDegrees = graph.inDegrees() // DataSet[(Long, LongValue)]
val outDegrees = graph.outDegrees() // DataSet[(Long, LongValue)]
val allDegrees = graph.getDegrees() // DataSet[(Long, LongValue)]
// Simple reductions on neighbors
val maxNeighborValue = graph.reduceOnNeighbors(
new ReduceNeighborsFunction[Double] {
override def reduceNeighbors(first: Double, second: Double): Double = {
math.max(first, second)
}
},
EdgeDirection.ALL
)
val sumEdgeWeights = graph.reduceOnEdges(
new ReduceEdgesFunction[Double] {
override def reduceEdges(first: Double, second: Double): Double = {
first + second
}
},
EdgeDirection.OUT
)// Custom edge analysis function
class EdgeStatistics extends EdgesFunction[Long, Double, (Long, Int, Double, Double)] {
override def iterateEdges(edges: Iterable[(Long, Edge[Long, Double])],
out: Collector[(Long, Int, Double, Double)]): Unit = {
val edgeList = edges.toList
if (edgeList.nonEmpty) {
val vertexId = edgeList.head._1
val edgeCount = edgeList.size
val weights = edgeList.map(_._2.getValue)
val minWeight = weights.min
val maxWeight = weights.max
out.collect((vertexId, edgeCount, minWeight, maxWeight))
}
}
}
// Apply custom edge statistics
val edgeStats = graph.groupReduceOnEdges(new EdgeStatistics(), EdgeDirection.OUT)
// Custom neighbor analysis with vertex access
class NeighborAnalysis extends NeighborsFunctionWithVertexValue[Long, Double, Double, (Long, Double, Double)] {
override def iterateNeighbors(vertex: Vertex[Long, Double],
neighbors: Iterable[(Edge[Long, Double], Vertex[Long, Double])],
out: Collector[(Long, Double, Double)]): Unit = {
val neighborList = neighbors.toList
if (neighborList.nonEmpty) {
val avgNeighborValue = neighborList.map(_._2.getValue).sum / neighborList.size
val avgEdgeWeight = neighborList.map(_._1.getValue).sum / neighborList.size
out.collect((vertex.getId, avgNeighborValue, avgEdgeWeight))
}
}
}
// Apply neighbor analysis
val neighborStats = graph.groupReduceOnNeighbors(new NeighborAnalysis(), EdgeDirection.ALL)Calculate properties for individual vertices based on their neighborhoods:
// Vertex clustering coefficient
class ClusteringCoefficient extends NeighborsFunction[Long, Double, Double, (Long, Double)] {
override def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Double], Vertex[Long, Double])],
out: Collector[(Long, Double)]): Unit = {
val neighborList = neighbors.toList
if (neighborList.size >= 2) {
val vertexId = neighborList.head._1
val neighborIds = neighborList.map(_._3.getId).toSet
// Count edges between neighbors (simplified - would need actual graph access)
val possibleEdges = neighborIds.size * (neighborIds.size - 1) / 2
val clustering = if (possibleEdges > 0) 0.0 else 0.0 // Placeholder logic
out.collect((vertexId, clustering))
}
}
}Compute graph-wide statistics by combining local measurements:
// Combine degree calculations with other metrics
val degreeStats = graph.getDegrees().collect()
val avgDegree = degreeStats.map(_._2.getValue).sum / degreeStats.length.toDouble
val maxDegree = degreeStats.map(_._2.getValue).max
val minDegree = degreeStats.map(_._2.getValue).minThe analytics capabilities provide both built-in metrics and flexible frameworks for custom graph analysis, all executed efficiently within Flink's distributed environment.