or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md
tile.json

iterations.mddocs/

Iterations

Apache Flink Scala API supports iterative algorithms through bulk iteration and delta iteration patterns, enabling efficient implementation of graph algorithms, machine learning algorithms, and other iterative computations.

Bulk Iteration

Bulk iteration repeatedly applies a transformation function to the entire dataset until a maximum number of iterations is reached or a termination condition is met.

Basic Bulk Iteration

class DataSet[T] {
  // Simple iteration with fixed number of iterations
  def iterate(maxIterations: Int)(stepFunction: DataSet[T] => DataSet[T]): DataSet[T]
}

Bulk Iteration with Termination

class DataSet[T] {
  // Iteration with termination condition
  def iterateWithTermination(maxIterations: Int)(
    stepFunction: DataSet[T] => (DataSet[T], DataSet[_])
  ): DataSet[T]
}

Delta Iteration

Delta iteration is optimized for scenarios where only a small portion of the data changes in each iteration. It maintains a solution set (complete state) and a workset (elements that may trigger updates).

class DataSet[T] {
  // Delta iteration with string-based key fields
  def iterateDelta[R: TypeInformation: ClassTag](
    workset: DataSet[R], 
    maxIterations: Int, 
    keyFields: Array[String]
  )(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
  
  // Delta iteration with integer-based key fields
  def iterateDelta[R: TypeInformation: ClassTag](
    workset: DataSet[R], 
    maxIterations: Int, 
    keyFields: Array[Int]
  )(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
}

Usage Examples

Simple Bulk Iteration - Powers of 2

import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

// Start with initial value
val initial = env.fromElements(1)

// Iterate to compute powers of 2
val result = initial.iterate(10) { current =>
  current.map(_ * 2)
}

// Result will be 1 * 2^10 = 1024
result.print()

Bulk Iteration with Termination - Convergence

import org.apache.flink.api.scala._

case class Point(x: Double, y: Double)

val env = ExecutionEnvironment.getExecutionEnvironment

// Initial points
val initialPoints = env.fromElements(
  Point(1.0, 1.0),
  Point(2.0, 2.0),
  Point(3.0, 3.0)
)

// Iterate until points converge to origin (0,0)
val convergedPoints = initialPoints.iterateWithTermination(100) { points =>
  // Step function: move points closer to origin
  val newPoints = points.map { p =>
    Point(p.x * 0.9, p.y * 0.9)
  }
  
  // Termination condition: check if points are close enough to origin
  val terminationCriterion = newPoints.filter { p =>
    math.sqrt(p.x * p.x + p.y * p.y) > 0.01
  }
  
  (newPoints, terminationCriterion)
}

Delta Iteration - Single Source Shortest Path

import org.apache.flink.api.scala._

case class Vertex(id: Long, distance: Double)
case class Edge(src: Long, dst: Long, weight: Double)

val env = ExecutionEnvironment.getExecutionEnvironment

// Graph vertices with initial distances (infinity except source)
val vertices = env.fromElements(
  Vertex(1L, 0.0),    // Source vertex
  Vertex(2L, Double.PositiveInfinity),
  Vertex(3L, Double.PositiveInfinity),
  Vertex(4L, Double.PositiveInfinity)
)

// Graph edges
val edges = env.fromElements(
  Edge(1L, 2L, 1.0),
  Edge(1L, 3L, 4.0),
  Edge(2L, 3L, 2.0),
  Edge(2L, 4L, 5.0),
  Edge(3L, 4L, 1.0)
)

// Initial workset: only source vertex
val initialWorkset = env.fromElements(Vertex(1L, 0.0))

// Run delta iteration
val shortestPaths = vertices.iterateDelta(initialWorkset, 10, Array("id")) {
  (solutionSet, workset) =>
    
    // Join workset with edges to find candidate updates
    val candidates = workset
      .join(edges)
      .where(_.id)
      .equalTo(_.src)
      .apply { (vertex, edge) =>
        Vertex(edge.dst, vertex.distance + edge.weight)
      }
    
    // Join candidates with current solution to find improvements
    val updates = candidates
      .join(solutionSet)
      .where(_.id)
      .equalTo(_.id)
      .flatMap { (candidate, current) =>
        if (candidate.distance < current.distance) {
          Some(candidate)
        } else {
          None
        }
      }
    
    // New solution set: merge updates
    val newSolutionSet = solutionSet
      .leftOuterJoin(updates)
      .where(_.id)
      .equalTo(_.id)
      .apply { (current, updateOpt) =>
        Option(updateOpt).getOrElse(current)
      }
    
    // New workset: vertices that were updated
    val newWorkset = updates
    
    (newSolutionSet, newWorkset)
}

PageRank Algorithm

import org.apache.flink.api.scala._

case class Page(id: Long, rank: Double)
case class Link(src: Long, dst: Long)

val env = ExecutionEnvironment.getExecutionEnvironment

// Initial page ranks (equal distribution)
val pages = env.fromElements(
  Page(1L, 1.0),
  Page(2L, 1.0),
  Page(3L, 1.0),
  Page(4L, 1.0)
)

// Page links
val links = env.fromElements(
  Link(1L, 2L),
  Link(1L, 3L),
  Link(2L, 3L),
  Link(3L, 1L),
  Link(3L, 4L),
  Link(4L, 1L)
)

val dampingFactor = 0.85
val numPages = 4

// Calculate out-degrees for each page
val outDegrees = links
  .groupBy(_.src)
  .reduceGroup { links =>
    val linkList = links.toList
    val srcId = linkList.head.src
    val degree = linkList.length
    (srcId, degree)
  }

// Run PageRank iteration
val pageRanks = pages.iterate(10) { currentRanks =>
  
  // Calculate rank contributions
  val contributions = currentRanks
    .join(outDegrees)
    .where(_.id)
    .equalTo(_._1)
    .flatMap { (page, outDegree) =>
      val contribution = page.rank / outDegree._2
      // This would need to be joined with links to get actual contributions
      List((page.id, contribution)) // Simplified
    }
    .join(links)
    .where(_._1)
    .equalTo(_.src)
    .map { (contrib, link) =>
      (link.dst, contrib._2)
    }
  
  // Aggregate contributions and apply PageRank formula
  val newRanks = contributions
    .groupBy(_._1)
    .reduceGroup { contribs =>
      val contribList = contribs.toList
      val pageId = contribList.head._1
      val totalContrib = contribList.map(_._2).sum
      Page(pageId, (1.0 - dampingFactor) / numPages + dampingFactor * totalContrib)
    }
  
  newRanks
}

Connected Components with Delta Iteration

import org.apache.flink.api.scala._

case class ComponentVertex(id: Long, componentId: Long)

val env = ExecutionEnvironment.getExecutionEnvironment

// Initial vertices (each vertex is its own component)
val vertices = env.fromElements(
  ComponentVertex(1L, 1L),
  ComponentVertex(2L, 2L),
  ComponentVertex(3L, 3L),
  ComponentVertex(4L, 4L),
  ComponentVertex(5L, 5L)
)

val edges = env.fromElements(
  (1L, 2L), (2L, 3L), (4L, 5L)
)

// Initial workset: all vertices
val initialWorkset = vertices

val components = vertices.iterateDelta(initialWorkset, 10, Array("id")) {
  (solutionSet, workset) =>
    
    // Propagate minimum component ID to neighbors
    val candidates = workset
      .join(edges)
      .where(_.id)
      .equalTo(_._1)
      .map { (vertex, edge) =>
        ComponentVertex(edge._2, vertex.componentId)
      }
      .union(
        workset
          .join(edges)
          .where(_.id)
          .equalTo(_._2)
          .map { (vertex, edge) =>
            ComponentVertex(edge._1, vertex.componentId)
          }
      )
    
    // Find minimum component ID for each vertex
    val minCandidates = candidates
      .groupBy(_.id)
      .min("componentId")
    
    // Update solution set with improvements
    val updates = minCandidates
      .join(solutionSet)
      .where(_.id)
      .equalTo(_.id)
      .flatMap { (candidate, current) =>
        if (candidate.componentId < current.componentId) {
          Some(candidate)
        } else {
          None
        }
      }
    
    val newSolutionSet = solutionSet
      .leftOuterJoin(updates)
      .where(_.id)
      .equalTo(_.id)
      .apply { (current, updateOpt) =>
        Option(updateOpt).getOrElse(current)
      }
    
    (newSolutionSet, updates)
}

K-Means Clustering

import org.apache.flink.api.scala._

case class Point2D(x: Double, y: Double)
case class Centroid(id: Int, x: Double, y: Double)

val env = ExecutionEnvironment.getExecutionEnvironment

// Data points
val points = env.fromElements(
  Point2D(1.0, 1.0), Point2D(1.5, 1.5), Point2D(2.0, 2.0),
  Point2D(8.0, 8.0), Point2D(8.5, 8.5), Point2D(9.0, 9.0)
)

// Initial centroids
val initialCentroids = env.fromElements(
  Centroid(0, 0.0, 0.0),
  Centroid(1, 5.0, 5.0)
)

// K-means iteration
val finalCentroids = initialCentroids.iterate(20) { centroids =>
  
  // Assign each point to nearest centroid
  val assignments = points
    .map { point =>
      // Find nearest centroid (simplified - would need to collect centroids)
      // This is a simplified version for demonstration
      val nearestCentroidId = 0 // Would calculate actual nearest
      (nearestCentroidId, point, 1)
    }
  
  // Calculate new centroids
  val newCentroids = assignments
    .groupBy(_._1)
    .reduceGroup { assignments =>
      val assignmentList = assignments.toList
      val centroidId = assignmentList.head._1
      val pointSum = assignmentList.map(_._2).reduce { (p1, p2) =>
        Point2D(p1.x + p2.x, p1.y + p2.y)
      }
      val count = assignmentList.length
      Centroid(centroidId, pointSum.x / count, pointSum.y / count)
    }
  
  newCentroids
}

Best Practices

Performance Considerations

  1. Use Delta Iteration for Sparse Updates: When only a small fraction of data changes per iteration
  2. Choose Appropriate Termination Conditions: Balance accuracy with performance
  3. Optimize Broadcast Variables: Use for small lookup tables in iterations
  4. Consider Checkpointing: For long-running iterations to handle failures

Common Patterns

  1. Graph Algorithms: Use delta iteration with vertex-centric approach
  2. Machine Learning: Use bulk iteration for gradient descent algorithms
  3. Convergence Detection: Implement custom termination criteria based on convergence metrics
  4. State Management: Use solution sets in delta iteration to maintain complete state