or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-integration.mdgraph-algorithms.mdgraph-analytics.mdgraph-creation.mdgraph-transformations.mdindex.mduser-defined-functions.md
tile.json

graph-analytics.mddocs/

Graph Analytics

Built-in graph metrics, degree calculations, neighborhood operations, and custom reduction functions for comprehensive graph analysis and statistical computation.

Capabilities

Degree Calculations

Compute vertex degrees for analyzing graph connectivity patterns.

/**
 * Return the in-degree of all vertices in the graph
 * @return A DataSet of Tuple2<vertexId, inDegree>
 */
def inDegrees(): DataSet[(K, LongValue)]

/**
 * Return the out-degree of all vertices in the graph
 * @return A DataSet of Tuple2<vertexId, outDegree>
 */
def outDegrees(): DataSet[(K, LongValue)]

/**
 * Return the degree of all vertices in the graph
 * @return A DataSet of Tuple2<vertexId, degree>
 */
def getDegrees(): DataSet[(K, LongValue)]

Neighborhood Reduction Operations

Perform reduction operations over vertex neighborhoods for custom analytics.

/**
 * Compute a reduce transformation over the neighbors' vertex values of each vertex.
 * For each vertex, the transformation consecutively calls a
 * ReduceNeighborsFunction until only a single value for each vertex remains.
 * The ReduceNeighborsFunction combines a pair of neighbor vertex values
 * into one new value of the same type.
 * @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex.
 * @param direction the edge direction (in-, out-, all-)
 * @return a Dataset of Tuple2, with one tuple per vertex.
 * The first field of the Tuple2 is the vertex ID and the second field
 * is the aggregate value computed by the provided ReduceNeighborsFunction.
 */
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], 
                      direction: EdgeDirection): DataSet[(K, VV)]

/**
 * Compute a reduce transformation over the neighbors' vertex values of each vertex.
 * For each vertex, the transformation consecutively calls a
 * ReduceNeighborsFunction until only a single value for each vertex remains.
 * The ReduceNeighborsFunction combines a pair of neighbor vertex values
 * into one new value of the same type.
 * @param reduceEdgesFunction the reduce function to apply to the edges of each vertex.
 * @param direction the edge direction (in-, out-, all-)
 * @return a Dataset of Tuple2, with one tuple per vertex.
 * The first field of the Tuple2 is the vertex ID and the second field
 * is the aggregate value computed by the provided ReduceNeighborsFunction.
 */
def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], 
                  direction: EdgeDirection): DataSet[(K, EV)]

Group Reduction Operations

Perform more complex aggregations over neighborhoods using user-defined functions.

/**
 * Compute an aggregate over the edges of each vertex. The function applied
 * on the edges has access to the vertex value.
 * @param edgesFunction the function to apply to the neighborhood
 * @param direction the edge direction (in-, out-, all-)
 * @tparam T the output type
 * @return a dataset of a T
 */
def groupReduceOnEdges[T](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T], 
                          direction: EdgeDirection): DataSet[T]

/**
 * Compute an aggregate over the edges of each vertex. The function applied
 * on the edges has access to the vertex value.
 * @param edgesFunction the function to apply to the neighborhood
 * @param direction the edge direction (in-, out-, all-)
 * @tparam T the output type
 * @return a dataset of a T
 */
def groupReduceOnEdges[T](edgesFunction: EdgesFunction[K, EV, T],
                          direction: EdgeDirection): DataSet[T]

/**
 * Compute an aggregate over the neighbors (edges and vertices) of each
 * vertex. The function applied on the neighbors has access to the vertex
 * value.
 * @param neighborsFunction the function to apply to the neighborhood
 * @param direction the edge direction (in-, out-, all-)
 * @tparam T the output type
 * @return a dataset of a T
 */
def groupReduceOnNeighbors[T](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T],
                              direction: EdgeDirection): DataSet[T]

/**
 * Compute an aggregate over the neighbors (edges and vertices) of each
 * vertex.
 * @param neighborsFunction the function to apply to the neighborhood
 * @param direction the edge direction (in-, out-, all-)
 * @tparam T the output type
 * @return a dataset of a T
 */
def groupReduceOnNeighbors[T](neighborsFunction: NeighborsFunction[K, VV, EV, T],
                              direction: EdgeDirection): DataSet[T]

Function Interfaces

Reduction Function Types

Key function interfaces for implementing custom neighborhood reductions:

// From Java Gelly - Reduce functions for simple aggregations
trait ReduceNeighborsFunction[VV] {
  def reduceNeighbors(firstNeighborValue: VV, secondNeighborValue: VV): VV
}

trait ReduceEdgesFunction[EV] {
  def reduceEdges(firstEdgeValue: EV, secondEdgeValue: EV): EV
}

Edge Direction Enumeration

Control which edges to consider in neighborhood operations:

// From Java Gelly
object EdgeDirection extends Enumeration {
  val IN: EdgeDirection     // Consider only incoming edges
  val OUT: EdgeDirection    // Consider only outgoing edges
  val ALL: EdgeDirection    // Consider both incoming and outgoing edges
}

Usage Examples:

import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex, EdgeDirection}
import org.apache.flink.api.scala._
import org.apache.flink.types.LongValue

val env = ExecutionEnvironment.getExecutionEnvironment

// Create sample graph with numeric vertex values
val vertices = env.fromCollection(Seq(
  new Vertex(1L, 10.0),
  new Vertex(2L, 20.0),
  new Vertex(3L, 30.0),
  new Vertex(4L, 40.0)
))

val edges = env.fromCollection(Seq(
  new Edge(1L, 2L, 1.5),
  new Edge(2L, 3L, 2.5), 
  new Edge(3L, 4L, 3.5),
  new Edge(1L, 4L, 4.5)
))

val graph = Graph.fromDataSet(vertices, edges, env)

// Basic degree calculations
val inDegrees = graph.inDegrees()       // DataSet[(Long, LongValue)]
val outDegrees = graph.outDegrees()     // DataSet[(Long, LongValue)]
val allDegrees = graph.getDegrees()     // DataSet[(Long, LongValue)]

// Simple reductions on neighbors
val maxNeighborValue = graph.reduceOnNeighbors(
  new ReduceNeighborsFunction[Double] {
    override def reduceNeighbors(first: Double, second: Double): Double = {
      math.max(first, second)
    }
  },
  EdgeDirection.ALL
)

val sumEdgeWeights = graph.reduceOnEdges(
  new ReduceEdgesFunction[Double] {
    override def reduceEdges(first: Double, second: Double): Double = {
      first + second
    }
  },
  EdgeDirection.OUT
)

Advanced Analytics Examples

// Custom edge analysis function
class EdgeStatistics extends EdgesFunction[Long, Double, (Long, Int, Double, Double)] {
  override def iterateEdges(edges: Iterable[(Long, Edge[Long, Double])], 
                           out: Collector[(Long, Int, Double, Double)]): Unit = {
    val edgeList = edges.toList
    if (edgeList.nonEmpty) {
      val vertexId = edgeList.head._1
      val edgeCount = edgeList.size
      val weights = edgeList.map(_._2.getValue)
      val minWeight = weights.min
      val maxWeight = weights.max
      
      out.collect((vertexId, edgeCount, minWeight, maxWeight))
    }
  }
}

// Apply custom edge statistics
val edgeStats = graph.groupReduceOnEdges(new EdgeStatistics(), EdgeDirection.OUT)

// Custom neighbor analysis with vertex access
class NeighborAnalysis extends NeighborsFunctionWithVertexValue[Long, Double, Double, (Long, Double, Double)] {
  override def iterateNeighbors(vertex: Vertex[Long, Double], 
                               neighbors: Iterable[(Edge[Long, Double], Vertex[Long, Double])], 
                               out: Collector[(Long, Double, Double)]): Unit = {
    val neighborList = neighbors.toList
    if (neighborList.nonEmpty) {
      val avgNeighborValue = neighborList.map(_._2.getValue).sum / neighborList.size
      val avgEdgeWeight = neighborList.map(_._1.getValue).sum / neighborList.size
      
      out.collect((vertex.getId, avgNeighborValue, avgEdgeWeight))
    }
  }
}

// Apply neighbor analysis
val neighborStats = graph.groupReduceOnNeighbors(new NeighborAnalysis(), EdgeDirection.ALL)

Analytical Patterns

Local Graph Properties

Calculate properties for individual vertices based on their neighborhoods:

// Vertex clustering coefficient
class ClusteringCoefficient extends NeighborsFunction[Long, Double, Double, (Long, Double)] {
  override def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Double], Vertex[Long, Double])], 
                               out: Collector[(Long, Double)]): Unit = {
    val neighborList = neighbors.toList
    if (neighborList.size >= 2) {
      val vertexId = neighborList.head._1
      val neighborIds = neighborList.map(_._3.getId).toSet
      
      // Count edges between neighbors (simplified - would need actual graph access)
      val possibleEdges = neighborIds.size * (neighborIds.size - 1) / 2
      val clustering = if (possibleEdges > 0) 0.0 else 0.0 // Placeholder logic
      
      out.collect((vertexId, clustering))
    }
  }
}

Aggregated Statistics

Compute graph-wide statistics by combining local measurements:

// Combine degree calculations with other metrics
val degreeStats = graph.getDegrees().collect()
val avgDegree = degreeStats.map(_._2.getValue).sum / degreeStats.length.toDouble
val maxDegree = degreeStats.map(_._2.getValue).max
val minDegree = degreeStats.map(_._2.getValue).min

Performance Considerations

  • Direction Selection: Choose appropriate EdgeDirection (IN, OUT, ALL) to minimize computation
  • Function Complexity: Keep reduction functions simple for better performance
  • Memory Usage: Be aware of memory usage when collecting neighborhood information
  • Parallelization: Group reduction operations are automatically parallelized across the cluster
  • Caching: Consider caching frequently accessed neighborhood computations

The analytics capabilities provide both built-in metrics and flexible frameworks for custom graph analysis, all executed efficiently within Flink's distributed environment.