or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-integration.mdgraph-algorithms.mdgraph-analytics.mdgraph-creation.mdgraph-transformations.mdindex.mduser-defined-functions.md
tile.json

graph-algorithms.mddocs/

Graph Algorithms

Iterative computation models for implementing sophisticated graph algorithms using scatter-gather, gather-sum-apply, and vertex-centric paradigms, along with algorithm execution framework.

Capabilities

Algorithm Execution Framework

Execute pre-built algorithms and analytics on graphs with type-safe result handling.

/**
 * Run a graph algorithm on the graph.
 * @param algorithm the algorithm to run on the Graph
 * @return the result of the graph algorithm
 */
def run[T](algorithm: GraphAlgorithm[K, VV, EV, T]): T

/**
 * Run a graph analytic on the graph.
 * A GraphAnalytic is similar to a GraphAlgorithm but is terminal and results
 * are retrieved via accumulators. A Flink program has a single point of
 * execution. A GraphAnalytic defers execution to the user to allow composing
 * multiple analytics and algorithms into a single program.
 * @param analytic the analytic to run on the Graph
 */
def run[T](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]

Scatter-Gather Iterations

Vertex-centric iterative computation model where vertices scatter messages along edges and gather messages from neighbors.

/**
 * Runs a scatter-gather iteration on the graph.
 * No configuration options are provided.
 * @param scatterFunction the scatter function
 * @param gatherFunction the gather function  
 * @param maxIterations maximum number of iterations to perform
 * @return the updated Graph after the scatter-gather iteration has converged or
 *         after maximumNumberOfIterations.
 */
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],
                                 gatherFunction: GatherFunction[K, VV, M],
                                 maxIterations: Int): Graph[K, VV, EV]

/**
 * Runs a scatter-gather iteration on the graph with configuration options.
 * @param scatterFunction the scatter function
 * @param gatherFunction the gather function
 * @param maxIterations maximum number of iterations to perform
 * @param parameters the iteration configuration parameters
 * @return the updated Graph after the scatter-gather iteration has converged or
 *         after maximumNumberOfIterations.
 */
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],
                                 gatherFunction: GatherFunction[K, VV, M],
                                 maxIterations: Int, 
                                 parameters: ScatterGatherConfiguration): Graph[K, VV, EV]

Gather-Sum-Apply Iterations

Three-phase iterative computation model for graph algorithms that need to collect neighborhood information, aggregate it, and update vertex values.

/**
 * Runs a Gather-Sum-Apply iteration on the graph.
 * No configuration options are provided.
 * @param gatherFunction the gather function collects information about adjacent
 *                       vertices and edges
 * @param sumFunction the sum function aggregates the gathered information
 * @param applyFunction the apply function updates the vertex values with the aggregates
 * @param maxIterations maximum number of iterations to perform
 * @tparam M the intermediate type used between gather, sum and apply
 * @return the updated Graph after the gather-sum-apply iteration has converged or
 *         after maximumNumberOfIterations.
 */
def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M], 
                                  sumFunction: SumFunction[VV, EV, M], 
                                  applyFunction: ApplyFunction[K, VV, M], 
                                  maxIterations: Int): Graph[K, VV, EV]

/**
 * Runs a Gather-Sum-Apply iteration on the graph with configuration options.
 * @param gatherFunction the gather function collects information about adjacent
 *                       vertices and edges
 * @param sumFunction the sum function aggregates the gathered information
 * @param applyFunction the apply function updates the vertex values with the aggregates
 * @param maxIterations maximum number of iterations to perform
 * @param parameters the iteration configuration parameters
 * @tparam M the intermediate type used between gather, sum and apply
 * @return the updated Graph after the gather-sum-apply iteration has converged or
 *         after maximumNumberOfIterations.
 */
def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M], 
                                  sumFunction: SumFunction[VV, EV, M], 
                                  applyFunction: ApplyFunction[K, VV, M], 
                                  maxIterations: Int,
                                  parameters: GSAConfiguration): Graph[K, VV, EV]

Vertex-Centric Iterations

Pregel-style iterative computation model where vertices receive messages, perform computations, and send messages to neighbors.

/**
 * Runs a vertex-centric iteration on the graph.
 * No configuration options are provided.
 * @param computeFunction the compute function
 * @param combineFunction the optional message combiner function
 * @param maxIterations maximum number of iterations to perform
 * @return the updated Graph after the vertex-centric iteration has converged or
 *         after maximumNumberOfIterations.
 */
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M],
                                 combineFunction: MessageCombiner[K, M],
                                 maxIterations: Int): Graph[K, VV, EV]

/**
 * Runs a vertex-centric iteration on the graph with configuration options.
 * @param computeFunction the compute function
 * @param combineFunction the optional message combiner function
 * @param maxIterations maximum number of iterations to perform
 * @param parameters the iteration configuration parameters
 * @return the updated Graph after the vertex-centric iteration has converged or
 *         after maximumNumberOfIterations.
 */
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M],
                                 combineFunction: MessageCombiner[K, M],
                                 maxIterations: Int, 
                                 parameters: VertexCentricConfiguration): Graph[K, VV, EV]

Graph Validation

Validate graph properties and constraints using custom validators.

/**
 * Validate the graph using a GraphValidator.
 * @param validator the validator to apply
 * @return true if the graph is valid according to the validator, false otherwise
 */
def validate(validator: GraphValidator[K, VV, EV]): Boolean

Algorithm Types and Patterns

Function Interfaces (From Java Gelly)

The iteration methods use these key function interfaces from the underlying Java Gelly library:

// Scatter-Gather Functions
trait ScatterFunction[K, VV, M, EV] {
  def sendMessages(vertex: Vertex[K, VV]): Unit
}

trait GatherFunction[K, VV, M] {  
  def updateVertex(vertex: Vertex[K, VV], inMessages: MessageIterator[M]): VV
}

// Gather-Sum-Apply Functions
trait GSAGatherFunction[VV, EV, M] {
  def gather(neighbor: Neighbor[VV, EV]): M
}

trait SumFunction[VV, EV, M] {
  def sum(arg0: M, arg1: M): M  
}

trait ApplyFunction[K, VV, M] {
  def apply(newValue: M, currentValue: VV): VV
}

// Vertex-Centric Functions  
trait ComputeFunction[K, VV, EV, M] {
  def compute(vertex: Vertex[K, VV], messages: MessageIterator[M]): Unit
}

trait MessageCombiner[K, M] {
  def combineMessages(arg0: M, arg1: M): M
}

// Validation
trait GraphValidator[K, VV, EV] {
  def validate(graph: Graph[K, VV, EV]): Boolean
}

Configuration Types

Algorithm behavior can be customized using configuration objects:

// Configuration classes (from Java Gelly)
class ScatterGatherConfiguration
class GSAConfiguration  
class VertexCentricConfiguration

Usage Examples:

import org.apache.flink.graph.scala._
import org.apache.flink.graph._
import org.apache.flink.graph.spargel._
import org.apache.flink.graph.gsa._
import org.apache.flink.graph.pregel._

// Example: Single Source Shortest Path using Scatter-Gather
class SSSPScatter[K](sourceId: K) extends ScatterFunction[K, Double, Double, Double] {
  override def sendMessages(vertex: Vertex[K, Double]): Unit = {
    if (vertex.getId == sourceId || vertex.getValue < Double.MaxValue) {
      for (edge <- getEdges) {
        sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
      }
    }
  }
}

class SSSPGather extends GatherFunction[K, Double, Double] {
  override def updateVertex(vertex: Vertex[K, Double], 
                          inMessages: MessageIterator[Double]): Double = {
    var minDistance = vertex.getValue
    while (inMessages.hasNext) {
      val msg = inMessages.next()
      if (msg < minDistance) {
        minDistance = msg
      }
    }
    minDistance
  }
}

// Run SSSP algorithm
val sourceId = 1L
val scatterFunction = new SSSPScatter(sourceId)
val gatherFunction = new SSSPGather()
val result = graph.runScatterGatherIteration(scatterFunction, gatherFunction, 10)

// Example: PageRank using Gather-Sum-Apply
class PageRankGather extends GSAGatherFunction[Double, Double, Double] {
  override def gather(neighbor: Neighbor[Double, Double]): Double = {
    neighbor.getNeighborVertex.getValue / neighbor.getNeighborVertex.getOutDegree
  }
}

class PageRankSum extends SumFunction[Double, Double, Double] {
  override def sum(arg0: Double, arg1: Double): Double = arg0 + arg1
}

class PageRankApply(dampingFactor: Double) extends ApplyFunction[K, Double, Double] {
  override def apply(newValue: Double, currentValue: Double): Double = {
    (1.0 - dampingFactor) + dampingFactor * newValue
  }
}

// Run PageRank algorithm
val gatherFunc = new PageRankGather()
val sumFunc = new PageRankSum()  
val applyFunc = new PageRankApply(0.85)
val pageRankResult = graph.runGatherSumApplyIteration(gatherFunc, sumFunc, applyFunc, 10)

// Example: Using built-in algorithms
import org.apache.flink.graph.library._

val ssspAlgorithm = new SingleSourceShortestPaths[K, Double](sourceId, 10)
val distances = graph.run(ssspAlgorithm)

val pageRankAlgorithm = new PageRank[K, Double, Double](0.85, 10)  
val pageRankValues = graph.run(pageRankAlgorithm)

Algorithm Design Patterns

Scatter-Gather Pattern

Best for algorithms where:

  • Vertices need to send information to their neighbors
  • Each vertex processes messages from its neighbors independently
  • Examples: Single Source Shortest Path, Connected Components

Gather-Sum-Apply Pattern

Best for algorithms where:

  • You need to collect and aggregate information from neighbors
  • The aggregation can be expressed as an associative operation
  • Examples: PageRank, Triangle Counting

Vertex-Centric Pattern

Best for algorithms where:

  • Vertices need fine-grained control over message sending
  • Complex message processing is required
  • Examples: Graph Coloring, Community Detection

Performance Considerations

  • Convergence: All iteration methods support both maximum iteration limits and convergence criteria
  • Configuration: Use configuration objects to tune performance (memory, networking, convergence)
  • Message Types: Choose efficient message types (M) to minimize serialization overhead
  • Partitioning: Consider graph partitioning strategies for large graphs
  • Checkpointing: Enable checkpointing for long-running iterative algorithms