or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

graph-analytics.mdgraph-construction.mdgraph-operations.mdindex.mditerative-algorithms.md
tile.json

iterative-algorithms.mddocs/

Iterative Algorithms

Complete API for implementing custom graph algorithms using different iteration patterns and the algorithm framework.

Iteration Patterns Overview

Flink Gelly Scala provides three main iteration patterns for implementing graph algorithms:

  1. Scatter-Gather - Vertices scatter messages along edges and gather/apply updates
  2. Gather-Sum-Apply (GSA) - Three-phase pattern for vertex-centric computations
  3. Vertex-Centric (Pregel) - Pregel-style vertex-centric iterations with message passing

Scatter-Gather Iterations

Basic Scatter-Gather

def runScatterGatherIteration[M](
  scatterFunction: ScatterFunction[K, VV, M, EV],
  gatherFunction: GatherFunction[K, VV, M],
  maxIterations: Int
): Graph[K, VV, EV]

Runs a scatter-gather iteration on the graph without configuration options.

Parameters:

  • scatterFunction - Function that scatters messages from vertices to neighbors
  • gatherFunction - Function that gathers messages and updates vertex values
  • maxIterations - Maximum number of iterations to perform

Returns: Updated graph after convergence or maximum iterations

Configured Scatter-Gather

def runScatterGatherIteration[M](
  scatterFunction: ScatterFunction[K, VV, M, EV],
  gatherFunction: GatherFunction[K, VV, M],
  maxIterations: Int,
  parameters: ScatterGatherConfiguration
): Graph[K, VV, EV]

Runs scatter-gather iteration with configuration options.

Parameters:

  • scatterFunction - Message scattering function
  • gatherFunction - Message gathering and vertex update function
  • maxIterations - Maximum iterations
  • parameters - Iteration configuration parameters

Gather-Sum-Apply Iterations

Basic GSA

def runGatherSumApplyIteration[M](
  gatherFunction: GatherFunction[VV, EV, M],
  sumFunction: SumFunction[VV, EV, M],
  applyFunction: ApplyFunction[K, VV, M],
  maxIterations: Int
): Graph[K, VV, EV]

Runs a Gather-Sum-Apply iteration without configuration options.

Parameters:

  • gatherFunction - Collects information about adjacent vertices and edges
  • sumFunction - Aggregates the gathered information
  • applyFunction - Updates vertex values with the aggregated data
  • maxIterations - Maximum number of iterations

Configured GSA

def runGatherSumApplyIteration[M](
  gatherFunction: GatherFunction[VV, EV, M],
  sumFunction: SumFunction[VV, EV, M],
  applyFunction: ApplyFunction[K, VV, M],
  maxIterations: Int,
  parameters: GSAConfiguration
): Graph[K, VV, EV]

Runs GSA iteration with configuration parameters.

Parameters:

  • gatherFunction - Information gathering function
  • sumFunction - Aggregation function
  • applyFunction - Vertex update function
  • maxIterations - Maximum iterations
  • parameters - GSA configuration parameters

Vertex-Centric Iterations (Pregel)

Basic Vertex-Centric

def runVertexCentricIteration[M](
  computeFunction: ComputeFunction[K, VV, EV, M],
  combineFunction: MessageCombiner[K, M],
  maxIterations: Int
): Graph[K, VV, EV]

Runs a vertex-centric iteration without configuration options.

Parameters:

  • computeFunction - Vertex compute function that processes messages and updates values
  • combineFunction - Optional message combiner function
  • maxIterations - Maximum number of iterations

Configured Vertex-Centric

def runVertexCentricIteration[M](
  computeFunction: ComputeFunction[K, VV, EV, M],
  combineFunction: MessageCombiner[K, M],
  maxIterations: Int,
  parameters: VertexCentricConfiguration
): Graph[K, VV, EV]

Runs vertex-centric iteration with configuration parameters.

Parameters:

  • computeFunction - Vertex computation function
  • combineFunction - Message combiner for reducing messages
  • maxIterations - Maximum iterations
  • parameters - Vertex-centric configuration parameters

Algorithm Framework Integration

Algorithm Execution

def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T

Executes a graph algorithm that implements the GraphAlgorithm interface.

Parameters:

  • algorithm - Graph algorithm implementation

Returns: Algorithm result of type T

Analytics Execution

def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]

Executes a graph analytic. Analytics are terminal operations whose results are retrieved via accumulators, allowing composition of multiple analytics and algorithms into a single program.

Parameters:

  • analytic - Graph analytic implementation

Returns: The analytic instance for result retrieval

Function Interfaces

Scatter Function

abstract class ScatterFunction[K, VV, M, EV] {
  def sendMessages(vertex: Vertex[K, VV]): Unit
  def sendMessageTo(targetVertexId: K, message: M): Unit
  def getOutEdges: Iterable[Edge[K, EV]]
  def getInEdges: Iterable[Edge[K, EV]]
}

Base class for scatter functions that send messages from vertices to their neighbors.

Gather Function (Scatter-Gather)

abstract class GatherFunction[K, VV, M] {
  def gather(messages: Iterable[M]): VV
  def getSuperstepNumber: Int
}

Base class for gather functions in scatter-gather iterations.

Gather Function (GSA)

abstract class GatherFunction[VV, EV, M] {
  def gather(neighborVertex: Vertex[_, VV], edge: Edge[_, EV]): M
}

Base class for gather functions in GSA iterations.

Sum Function

abstract class SumFunction[VV, EV, M] {
  def sum(newValue: M, currentValue: M): M
}

Base class for sum functions that aggregate gathered information in GSA iterations.

Apply Function

abstract class ApplyFunction[K, VV, M] {
  def apply(newValue: M, currentValue: VV): VV
  def getSuperstepNumber: Int
}

Base class for apply functions that update vertex values in GSA iterations.

Compute Function

abstract class ComputeFunction[K, VV, EV, M] {
  def compute(vertex: Vertex[K, VV], messages: Iterable[M]): Unit
  def sendMessageTo(targetVertexId: K, message: M): Unit
  def setNewVertexValue(newValue: VV): Unit
  def getSuperstepNumber: Int
  def getOutEdges: Iterable[Edge[K, EV]]
  def getTotalNumVertices: Long
}

Base class for compute functions in vertex-centric iterations.

Message Combiner

abstract class MessageCombiner[K, M] {
  def combineMessages(messages: Iterable[M]): M
}

Base class for combining multiple messages sent to the same vertex.

Configuration Classes

ScatterGatherConfiguration

class ScatterGatherConfiguration {
  def setName(name: String): ScatterGatherConfiguration
  def setParallelism(parallelism: Int): ScatterGatherConfiguration
  def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): ScatterGatherConfiguration
  def setOptNumVertices(numVertices: Long): ScatterGatherConfiguration
}

Configuration for scatter-gather iterations.

GSAConfiguration

class GSAConfiguration {
  def setName(name: String): GSAConfiguration
  def setParallelism(parallelism: Int): GSAConfiguration
  def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): GSAConfiguration
  def setOptNumVertices(numVertices: Long): GSAConfiguration
}

Configuration for gather-sum-apply iterations.

VertexCentricConfiguration

class VertexCentricConfiguration {
  def setName(name: String): VertexCentricConfiguration
  def setParallelism(parallelism: Int): VertexCentricConfiguration
  def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): VertexCentricConfiguration
  def setOptNumVertices(numVertices: Long): VertexCentricConfiguration
}

Configuration for vertex-centric iterations.

Usage Examples

Single Source Shortest Path (Scatter-Gather)

import org.apache.flink.graph.spargel.{ScatterFunction, GatherFunction}

// Scatter function sends current distance + edge weight to neighbors
class DistanceScatter extends ScatterFunction[Long, Double, Double, Double] {
  override def sendMessages(vertex: Vertex[Long, Double]): Unit = {
    if (vertex.getValue < Double.MaxValue) {
      for (edge <- getOutEdges) {
        sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
      }
    }
  }
}

// Gather function takes minimum of received distances
class DistanceGather extends GatherFunction[Long, Double, Double] {
  override def gather(messages: Iterable[Double]): Double = {
    var min = Double.MaxValue
    for (msg <- messages) {
      min = Math.min(min, msg)
    }
    min
  }
}

// Run algorithm
val shortestPaths = graph.runScatterGatherIteration(
  new DistanceScatter(),
  new DistanceGather(),
  maxIterations = 10
)

PageRank (Vertex-Centric)

import org.apache.flink.graph.pregel.ComputeFunction
import org.apache.flink.graph.{Vertex, Edge}

class PageRankCompute(maxIterations: Int) extends ComputeFunction[Long, Double, Double, Double] {
  override def compute(vertex: Vertex[Long, Double], messages: Iterable[Double]): Unit = {
    val dampingFactor = 0.85
    val numVertices = getTotalNumVertices
    
    var sum = 0.0
    for (msg <- messages) {
      sum += msg
    }
    
    val newValue = (1.0 - dampingFactor) / numVertices + dampingFactor * sum
    setNewVertexValue(newValue)
    
    // Send messages to neighbors
    if (getSuperstepNumber < maxIterations - 1) {
      val outDegree = getOutEdges.size
      if (outDegree > 0) {
        val msgValue = newValue / outDegree
        for (edge <- getOutEdges) {
          sendMessageTo(edge.getTarget, msgValue)
        }
      }
    }
  }
}

// Run PageRank
val maxIterations = 10
val pageRankResult = graph.runVertexCentricIteration(
  new PageRankCompute(maxIterations),
  null, // No message combiner
  maxIterations
)

Connected Components (GSA)

import org.apache.flink.graph.gsa.{GatherFunction, SumFunction, ApplyFunction}

class ComponentIdGather extends GatherFunction[Long, NullValue, Long] {
  override def gather(neighborVertex: Vertex[_, Long], edge: Edge[_, NullValue]): Long = {
    neighborVertex.getValue
  }
}

class ComponentIdSum extends SumFunction[Long, NullValue, Long] {
  override def sum(newValue: Long, currentValue: Long): Long = {
    Math.min(newValue, currentValue)
  }
}

class ComponentIdApply extends ApplyFunction[Long, Long, Long] {
  override def apply(newValue: Long, currentValue: Long): Long = {
    Math.min(newValue, currentValue)
  }
}

// Run connected components
val components = graph.runGatherSumApplyIteration(
  new ComponentIdGather(),
  new ComponentIdSum(),
  new ComponentIdApply(),
  maxIterations = 10
)

Using Built-in Algorithms

import org.apache.flink.graph.library.PageRank

// Use pre-implemented PageRank algorithm
val pageRankAlgorithm = new PageRank[Long](dampingFactor = 0.85, maxIterations = 10)
val pageRankResult = graph.run(pageRankAlgorithm)

// The result is a DataSet[(Long, Double)] containing vertex IDs and PageRank scores
val topVertices = pageRankResult.first(10)