or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

graph-analytics.mdgraph-construction.mdgraph-operations.mdindex.mditerative-algorithms.md
tile.json

graph-analytics.mddocs/

Graph Analytics

Complete API for graph analytics, metrics calculations, and structural operations.

Degree Calculations

Basic Degree Metrics

def inDegrees(): DataSet[(K, LongValue)]

Returns the in-degree of all vertices in the graph as a DataSet of (vertexId, inDegree) tuples.

def outDegrees(): DataSet[(K, LongValue)]

Returns the out-degree of all vertices in the graph as a DataSet of (vertexId, outDegree) tuples.

def getDegrees(): DataSet[(K, LongValue)]

Returns the total degree (in-degree + out-degree) of all vertices as a DataSet of (vertexId, degree) tuples.

Structural Transformations

Graph Structure Modifications

def getUndirected(): Graph[K, VV, EV]

Creates an undirected version of the graph by adding all inverse-direction edges. Each edge (u,v) results in both (u,v) and (v,u) edges.

def reverse(): Graph[K, VV, EV]

Reverses the direction of all edges in the graph. Edge (u,v) becomes (v,u).

Validation

def validate(validator: GraphValidator[K, VV, EV]): Boolean

Validates the graph using the provided validator function.

Parameters:

  • validator - GraphValidator that defines validation rules

Set Operations

Graph Union

def union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]

Performs union on the vertices and edges sets of the input graphs. Removes duplicate vertices but maintains duplicate edges.

Parameters:

  • graph - The graph to perform union with

Graph Difference

def difference(graph: Graph[K, VV, EV]): Graph[K, VV, EV]

Performs difference on the vertex and edge sets. Removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed.

Parameters:

  • graph - The graph to perform difference with

Graph Intersection

def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV]

Performs intersection on the edge sets of the input graphs. Edges are considered equal if they have the same source identifier, target identifier, and edge value.

Parameters:

  • graph - The graph to perform intersection with
  • distinctEdges - If true, exactly one edge represents all pairs of equal edges; if false, both edges of each pair are included

Neighborhood Operations

Edge-based Aggregations

def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T], direction: EdgeDirection): DataSet[T]

Computes an aggregate over the edges of each vertex without access to the vertex value.

Parameters:

  • edgesFunction - Function to apply to the edges of each vertex
  • direction - Edge direction (IN, OUT, ALL)
def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

Computes an aggregate over the edges of each vertex with access to the vertex value.

Parameters:

  • edgesFunction - Function that has access to both vertex value and edges
  • direction - Edge direction (IN, OUT, ALL)

Neighbor-based Aggregations

def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunction[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

Computes an aggregate over the neighbors (edges and vertices) of each vertex.

Parameters:

  • neighborsFunction - Function to apply to the neighborhood
  • direction - Edge direction (IN, OUT, ALL)
def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

Computes an aggregate over the neighbors with access to the source vertex value.

Parameters:

  • neighborsFunction - Function that has access to source vertex and neighbors
  • direction - Edge direction (IN, OUT, ALL)

Reduction Operations

Neighbor Value Reduction

def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction: EdgeDirection): DataSet[(K, VV)]

Computes a reduce transformation over the neighbors' vertex values of each vertex. The function consecutively combines pairs of neighbor vertex values until only a single value remains.

Parameters:

  • reduceNeighborsFunction - Reduce function to apply to neighbor values
  • direction - Edge direction (IN, OUT, ALL)

Returns: DataSet of (vertexId, aggregatedValue) tuples

Edge Value Reduction

def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): DataSet[(K, EV)]

Computes a reduce transformation over the edge values of each vertex. The function consecutively combines pairs of edge values until only a single value remains.

Parameters:

  • reduceEdgesFunction - Reduce function to apply to edge values
  • direction - Edge direction (IN, OUT, ALL)

Returns: DataSet of (vertexId, aggregatedValue) tuples

Custom Function Types

EdgesFunction

abstract class EdgesFunction[K, EV, T] {
  def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T]): Unit
}

Abstract base class for functions that operate on the edges of a vertex.

EdgesFunctionWithVertexValue

abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] {
  def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T]): Unit
}

Abstract base class for functions that operate on edges with access to the vertex value.

NeighborsFunction

abstract class NeighborsFunction[K, VV, EV, T] {
  def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])], out: Collector[T]): Unit
}

Abstract base class for functions that operate on vertex neighbors (edges and adjacent vertices).

NeighborsFunctionWithVertexValue

abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] {
  def iterateNeighbors(vertex: Vertex[K, VV], neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])], out: Collector[T]): Unit
}

Abstract base class for functions that operate on neighbors with access to the source vertex value.

Edge Direction Enum

object EdgeDirection extends Enumeration {
  val IN, OUT, ALL = Value
}

Enumeration for specifying edge directions in neighborhood operations:

  • IN - Consider only incoming edges
  • OUT - Consider only outgoing edges
  • ALL - Consider both incoming and outgoing edges

Usage Examples

Degree Analysis

import org.apache.flink.graph.EdgeDirection

// Calculate all degree metrics
val inDegrees = graph.inDegrees()
val outDegrees = graph.outDegrees()
val totalDegrees = graph.getDegrees()

// Find vertices with high out-degree
val highOutDegree = outDegrees.filter(_._2.getValue > 10)

Structural Operations

// Create undirected version
val undirectedGraph = graph.getUndirected()

// Reverse all edges
val reversedGraph = graph.reverse()

// Combine with another graph
val combinedGraph = graph.union(otherGraph)

// Find intersection with another graph
val intersection = graph.intersect(otherGraph, distinctEdges = true)

Neighborhood Aggregations

// Custom edge aggregation function
class SumEdgeValues extends EdgesFunction[Long, Double, Double] {
  override def iterateEdges(edges: Iterable[(Long, Edge[Long, Double])], out: Collector[Double]): Unit = {
    val sum = edges.map(_._2.getValue).sum
    out.collect(sum)
  }
}

// Apply edge aggregation
val edgeSums = graph.groupReduceOnEdges(new SumEdgeValues(), EdgeDirection.OUT)

// Reduce neighbor values
val neighborSums = graph.reduceOnNeighbors(
  new ReduceNeighborsFunction[String] {
    override def reduceNeighbors(firstNeighborValue: String, secondNeighborValue: String): String = {
      firstNeighborValue + "," + secondNeighborValue
    }
  },
  EdgeDirection.ALL
)

Advanced Analytics

// Calculate average edge weight per vertex
class AverageEdgeWeight extends EdgesFunctionWithVertexValue[Long, String, Double, (Long, Double)] {
  override def iterateEdges(vertex: Vertex[Long, String], edges: Iterable[Edge[Long, Double]], out: Collector[(Long, Double)]): Unit = {
    val edgeList = edges.toList
    if (edgeList.nonEmpty) {
      val average = edgeList.map(_.getValue).sum / edgeList.size
      out.collect((vertex.getId, average))
    }
  }
}

val avgWeights = graph.groupReduceOnEdges(new AverageEdgeWeight(), EdgeDirection.OUT)