Apache Flink Scala API supports iterative algorithms through bulk iteration and delta iteration patterns, enabling efficient implementation of graph algorithms, machine learning algorithms, and other iterative computations.
Bulk iteration repeatedly applies a transformation function to the entire dataset until a maximum number of iterations is reached or a termination condition is met.
class DataSet[T] {
// Simple iteration with fixed number of iterations
def iterate(maxIterations: Int)(stepFunction: DataSet[T] => DataSet[T]): DataSet[T]
}class DataSet[T] {
// Iteration with termination condition
def iterateWithTermination(maxIterations: Int)(
stepFunction: DataSet[T] => (DataSet[T], DataSet[_])
): DataSet[T]
}Delta iteration is optimized for scenarios where only a small portion of the data changes in each iteration. It maintains a solution set (complete state) and a workset (elements that may trigger updates).
class DataSet[T] {
// Delta iteration with string-based key fields
def iterateDelta[R: TypeInformation: ClassTag](
workset: DataSet[R],
maxIterations: Int,
keyFields: Array[String]
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
// Delta iteration with integer-based key fields
def iterateDelta[R: TypeInformation: ClassTag](
workset: DataSet[R],
maxIterations: Int,
keyFields: Array[Int]
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
}import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// Start with initial value
val initial = env.fromElements(1)
// Iterate to compute powers of 2
val result = initial.iterate(10) { current =>
current.map(_ * 2)
}
// Result will be 1 * 2^10 = 1024
result.print()import org.apache.flink.api.scala._
case class Point(x: Double, y: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
// Initial points
val initialPoints = env.fromElements(
Point(1.0, 1.0),
Point(2.0, 2.0),
Point(3.0, 3.0)
)
// Iterate until points converge to origin (0,0)
val convergedPoints = initialPoints.iterateWithTermination(100) { points =>
// Step function: move points closer to origin
val newPoints = points.map { p =>
Point(p.x * 0.9, p.y * 0.9)
}
// Termination condition: check if points are close enough to origin
val terminationCriterion = newPoints.filter { p =>
math.sqrt(p.x * p.x + p.y * p.y) > 0.01
}
(newPoints, terminationCriterion)
}import org.apache.flink.api.scala._
case class Vertex(id: Long, distance: Double)
case class Edge(src: Long, dst: Long, weight: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
// Graph vertices with initial distances (infinity except source)
val vertices = env.fromElements(
Vertex(1L, 0.0), // Source vertex
Vertex(2L, Double.PositiveInfinity),
Vertex(3L, Double.PositiveInfinity),
Vertex(4L, Double.PositiveInfinity)
)
// Graph edges
val edges = env.fromElements(
Edge(1L, 2L, 1.0),
Edge(1L, 3L, 4.0),
Edge(2L, 3L, 2.0),
Edge(2L, 4L, 5.0),
Edge(3L, 4L, 1.0)
)
// Initial workset: only source vertex
val initialWorkset = env.fromElements(Vertex(1L, 0.0))
// Run delta iteration
val shortestPaths = vertices.iterateDelta(initialWorkset, 10, Array("id")) {
(solutionSet, workset) =>
// Join workset with edges to find candidate updates
val candidates = workset
.join(edges)
.where(_.id)
.equalTo(_.src)
.apply { (vertex, edge) =>
Vertex(edge.dst, vertex.distance + edge.weight)
}
// Join candidates with current solution to find improvements
val updates = candidates
.join(solutionSet)
.where(_.id)
.equalTo(_.id)
.flatMap { (candidate, current) =>
if (candidate.distance < current.distance) {
Some(candidate)
} else {
None
}
}
// New solution set: merge updates
val newSolutionSet = solutionSet
.leftOuterJoin(updates)
.where(_.id)
.equalTo(_.id)
.apply { (current, updateOpt) =>
Option(updateOpt).getOrElse(current)
}
// New workset: vertices that were updated
val newWorkset = updates
(newSolutionSet, newWorkset)
}import org.apache.flink.api.scala._
case class Page(id: Long, rank: Double)
case class Link(src: Long, dst: Long)
val env = ExecutionEnvironment.getExecutionEnvironment
// Initial page ranks (equal distribution)
val pages = env.fromElements(
Page(1L, 1.0),
Page(2L, 1.0),
Page(3L, 1.0),
Page(4L, 1.0)
)
// Page links
val links = env.fromElements(
Link(1L, 2L),
Link(1L, 3L),
Link(2L, 3L),
Link(3L, 1L),
Link(3L, 4L),
Link(4L, 1L)
)
val dampingFactor = 0.85
val numPages = 4
// Calculate out-degrees for each page
val outDegrees = links
.groupBy(_.src)
.reduceGroup { links =>
val linkList = links.toList
val srcId = linkList.head.src
val degree = linkList.length
(srcId, degree)
}
// Run PageRank iteration
val pageRanks = pages.iterate(10) { currentRanks =>
// Calculate rank contributions
val contributions = currentRanks
.join(outDegrees)
.where(_.id)
.equalTo(_._1)
.flatMap { (page, outDegree) =>
val contribution = page.rank / outDegree._2
// This would need to be joined with links to get actual contributions
List((page.id, contribution)) // Simplified
}
.join(links)
.where(_._1)
.equalTo(_.src)
.map { (contrib, link) =>
(link.dst, contrib._2)
}
// Aggregate contributions and apply PageRank formula
val newRanks = contributions
.groupBy(_._1)
.reduceGroup { contribs =>
val contribList = contribs.toList
val pageId = contribList.head._1
val totalContrib = contribList.map(_._2).sum
Page(pageId, (1.0 - dampingFactor) / numPages + dampingFactor * totalContrib)
}
newRanks
}import org.apache.flink.api.scala._
case class ComponentVertex(id: Long, componentId: Long)
val env = ExecutionEnvironment.getExecutionEnvironment
// Initial vertices (each vertex is its own component)
val vertices = env.fromElements(
ComponentVertex(1L, 1L),
ComponentVertex(2L, 2L),
ComponentVertex(3L, 3L),
ComponentVertex(4L, 4L),
ComponentVertex(5L, 5L)
)
val edges = env.fromElements(
(1L, 2L), (2L, 3L), (4L, 5L)
)
// Initial workset: all vertices
val initialWorkset = vertices
val components = vertices.iterateDelta(initialWorkset, 10, Array("id")) {
(solutionSet, workset) =>
// Propagate minimum component ID to neighbors
val candidates = workset
.join(edges)
.where(_.id)
.equalTo(_._1)
.map { (vertex, edge) =>
ComponentVertex(edge._2, vertex.componentId)
}
.union(
workset
.join(edges)
.where(_.id)
.equalTo(_._2)
.map { (vertex, edge) =>
ComponentVertex(edge._1, vertex.componentId)
}
)
// Find minimum component ID for each vertex
val minCandidates = candidates
.groupBy(_.id)
.min("componentId")
// Update solution set with improvements
val updates = minCandidates
.join(solutionSet)
.where(_.id)
.equalTo(_.id)
.flatMap { (candidate, current) =>
if (candidate.componentId < current.componentId) {
Some(candidate)
} else {
None
}
}
val newSolutionSet = solutionSet
.leftOuterJoin(updates)
.where(_.id)
.equalTo(_.id)
.apply { (current, updateOpt) =>
Option(updateOpt).getOrElse(current)
}
(newSolutionSet, updates)
}import org.apache.flink.api.scala._
case class Point2D(x: Double, y: Double)
case class Centroid(id: Int, x: Double, y: Double)
val env = ExecutionEnvironment.getExecutionEnvironment
// Data points
val points = env.fromElements(
Point2D(1.0, 1.0), Point2D(1.5, 1.5), Point2D(2.0, 2.0),
Point2D(8.0, 8.0), Point2D(8.5, 8.5), Point2D(9.0, 9.0)
)
// Initial centroids
val initialCentroids = env.fromElements(
Centroid(0, 0.0, 0.0),
Centroid(1, 5.0, 5.0)
)
// K-means iteration
val finalCentroids = initialCentroids.iterate(20) { centroids =>
// Assign each point to nearest centroid
val assignments = points
.map { point =>
// Find nearest centroid (simplified - would need to collect centroids)
// This is a simplified version for demonstration
val nearestCentroidId = 0 // Would calculate actual nearest
(nearestCentroidId, point, 1)
}
// Calculate new centroids
val newCentroids = assignments
.groupBy(_._1)
.reduceGroup { assignments =>
val assignmentList = assignments.toList
val centroidId = assignmentList.head._1
val pointSum = assignmentList.map(_._2).reduce { (p1, p2) =>
Point2D(p1.x + p2.x, p1.y + p2.y)
}
val count = assignmentList.length
Centroid(centroidId, pointSum.x / count, pointSum.y / count)
}
newCentroids
}