or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md

iterations.mddocs/

0

# Iterations

1

2

Apache Flink Scala API supports iterative algorithms through bulk iteration and delta iteration patterns, enabling efficient implementation of graph algorithms, machine learning algorithms, and other iterative computations.

3

4

## Bulk Iteration

5

6

Bulk iteration repeatedly applies a transformation function to the entire dataset until a maximum number of iterations is reached or a termination condition is met.

7

8

### Basic Bulk Iteration

9

10

```scala { .api }

11

class DataSet[T] {

12

// Simple iteration with fixed number of iterations

13

def iterate(maxIterations: Int)(stepFunction: DataSet[T] => DataSet[T]): DataSet[T]

14

}

15

```

16

17

### Bulk Iteration with Termination

18

19

```scala { .api }

20

class DataSet[T] {

21

// Iteration with termination condition

22

def iterateWithTermination(maxIterations: Int)(

23

stepFunction: DataSet[T] => (DataSet[T], DataSet[_])

24

): DataSet[T]

25

}

26

```

27

28

## Delta Iteration

29

30

Delta iteration is optimized for scenarios where only a small portion of the data changes in each iteration. It maintains a solution set (complete state) and a workset (elements that may trigger updates).

31

32

```scala { .api }

33

class DataSet[T] {

34

// Delta iteration with string-based key fields

35

def iterateDelta[R: TypeInformation: ClassTag](

36

workset: DataSet[R],

37

maxIterations: Int,

38

keyFields: Array[String]

39

)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]

40

41

// Delta iteration with integer-based key fields

42

def iterateDelta[R: TypeInformation: ClassTag](

43

workset: DataSet[R],

44

maxIterations: Int,

45

keyFields: Array[Int]

46

)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]

47

}

48

```

49

50

## Usage Examples

51

52

### Simple Bulk Iteration - Powers of 2

53

54

```scala

55

import org.apache.flink.api.scala._

56

57

val env = ExecutionEnvironment.getExecutionEnvironment

58

59

// Start with initial value

60

val initial = env.fromElements(1)

61

62

// Iterate to compute powers of 2

63

val result = initial.iterate(10) { current =>

64

current.map(_ * 2)

65

}

66

67

// Result will be 1 * 2^10 = 1024

68

result.print()

69

```

70

71

### Bulk Iteration with Termination - Convergence

72

73

```scala

74

import org.apache.flink.api.scala._

75

76

case class Point(x: Double, y: Double)

77

78

val env = ExecutionEnvironment.getExecutionEnvironment

79

80

// Initial points

81

val initialPoints = env.fromElements(

82

Point(1.0, 1.0),

83

Point(2.0, 2.0),

84

Point(3.0, 3.0)

85

)

86

87

// Iterate until points converge to origin (0,0)

88

val convergedPoints = initialPoints.iterateWithTermination(100) { points =>

89

// Step function: move points closer to origin

90

val newPoints = points.map { p =>

91

Point(p.x * 0.9, p.y * 0.9)

92

}

93

94

// Termination condition: check if points are close enough to origin

95

val terminationCriterion = newPoints.filter { p =>

96

math.sqrt(p.x * p.x + p.y * p.y) > 0.01

97

}

98

99

(newPoints, terminationCriterion)

100

}

101

```

102

103

### Delta Iteration - Single Source Shortest Path

104

105

```scala

106

import org.apache.flink.api.scala._

107

108

case class Vertex(id: Long, distance: Double)

109

case class Edge(src: Long, dst: Long, weight: Double)

110

111

val env = ExecutionEnvironment.getExecutionEnvironment

112

113

// Graph vertices with initial distances (infinity except source)

114

val vertices = env.fromElements(

115

Vertex(1L, 0.0), // Source vertex

116

Vertex(2L, Double.PositiveInfinity),

117

Vertex(3L, Double.PositiveInfinity),

118

Vertex(4L, Double.PositiveInfinity)

119

)

120

121

// Graph edges

122

val edges = env.fromElements(

123

Edge(1L, 2L, 1.0),

124

Edge(1L, 3L, 4.0),

125

Edge(2L, 3L, 2.0),

126

Edge(2L, 4L, 5.0),

127

Edge(3L, 4L, 1.0)

128

)

129

130

// Initial workset: only source vertex

131

val initialWorkset = env.fromElements(Vertex(1L, 0.0))

132

133

// Run delta iteration

134

val shortestPaths = vertices.iterateDelta(initialWorkset, 10, Array("id")) {

135

(solutionSet, workset) =>

136

137

// Join workset with edges to find candidate updates

138

val candidates = workset

139

.join(edges)

140

.where(_.id)

141

.equalTo(_.src)

142

.apply { (vertex, edge) =>

143

Vertex(edge.dst, vertex.distance + edge.weight)

144

}

145

146

// Join candidates with current solution to find improvements

147

val updates = candidates

148

.join(solutionSet)

149

.where(_.id)

150

.equalTo(_.id)

151

.flatMap { (candidate, current) =>

152

if (candidate.distance < current.distance) {

153

Some(candidate)

154

} else {

155

None

156

}

157

}

158

159

// New solution set: merge updates

160

val newSolutionSet = solutionSet

161

.leftOuterJoin(updates)

162

.where(_.id)

163

.equalTo(_.id)

164

.apply { (current, updateOpt) =>

165

Option(updateOpt).getOrElse(current)

166

}

167

168

// New workset: vertices that were updated

169

val newWorkset = updates

170

171

(newSolutionSet, newWorkset)

172

}

173

```

174

175

### PageRank Algorithm

176

177

```scala

178

import org.apache.flink.api.scala._

179

180

case class Page(id: Long, rank: Double)

181

case class Link(src: Long, dst: Long)

182

183

val env = ExecutionEnvironment.getExecutionEnvironment

184

185

// Initial page ranks (equal distribution)

186

val pages = env.fromElements(

187

Page(1L, 1.0),

188

Page(2L, 1.0),

189

Page(3L, 1.0),

190

Page(4L, 1.0)

191

)

192

193

// Page links

194

val links = env.fromElements(

195

Link(1L, 2L),

196

Link(1L, 3L),

197

Link(2L, 3L),

198

Link(3L, 1L),

199

Link(3L, 4L),

200

Link(4L, 1L)

201

)

202

203

val dampingFactor = 0.85

204

val numPages = 4

205

206

// Calculate out-degrees for each page

207

val outDegrees = links

208

.groupBy(_.src)

209

.reduceGroup { links =>

210

val linkList = links.toList

211

val srcId = linkList.head.src

212

val degree = linkList.length

213

(srcId, degree)

214

}

215

216

// Run PageRank iteration

217

val pageRanks = pages.iterate(10) { currentRanks =>

218

219

// Calculate rank contributions

220

val contributions = currentRanks

221

.join(outDegrees)

222

.where(_.id)

223

.equalTo(_._1)

224

.flatMap { (page, outDegree) =>

225

val contribution = page.rank / outDegree._2

226

// This would need to be joined with links to get actual contributions

227

List((page.id, contribution)) // Simplified

228

}

229

.join(links)

230

.where(_._1)

231

.equalTo(_.src)

232

.map { (contrib, link) =>

233

(link.dst, contrib._2)

234

}

235

236

// Aggregate contributions and apply PageRank formula

237

val newRanks = contributions

238

.groupBy(_._1)

239

.reduceGroup { contribs =>

240

val contribList = contribs.toList

241

val pageId = contribList.head._1

242

val totalContrib = contribList.map(_._2).sum

243

Page(pageId, (1.0 - dampingFactor) / numPages + dampingFactor * totalContrib)

244

}

245

246

newRanks

247

}

248

```

249

250

### Connected Components with Delta Iteration

251

252

```scala

253

import org.apache.flink.api.scala._

254

255

case class ComponentVertex(id: Long, componentId: Long)

256

257

val env = ExecutionEnvironment.getExecutionEnvironment

258

259

// Initial vertices (each vertex is its own component)

260

val vertices = env.fromElements(

261

ComponentVertex(1L, 1L),

262

ComponentVertex(2L, 2L),

263

ComponentVertex(3L, 3L),

264

ComponentVertex(4L, 4L),

265

ComponentVertex(5L, 5L)

266

)

267

268

val edges = env.fromElements(

269

(1L, 2L), (2L, 3L), (4L, 5L)

270

)

271

272

// Initial workset: all vertices

273

val initialWorkset = vertices

274

275

val components = vertices.iterateDelta(initialWorkset, 10, Array("id")) {

276

(solutionSet, workset) =>

277

278

// Propagate minimum component ID to neighbors

279

val candidates = workset

280

.join(edges)

281

.where(_.id)

282

.equalTo(_._1)

283

.map { (vertex, edge) =>

284

ComponentVertex(edge._2, vertex.componentId)

285

}

286

.union(

287

workset

288

.join(edges)

289

.where(_.id)

290

.equalTo(_._2)

291

.map { (vertex, edge) =>

292

ComponentVertex(edge._1, vertex.componentId)

293

}

294

)

295

296

// Find minimum component ID for each vertex

297

val minCandidates = candidates

298

.groupBy(_.id)

299

.min("componentId")

300

301

// Update solution set with improvements

302

val updates = minCandidates

303

.join(solutionSet)

304

.where(_.id)

305

.equalTo(_.id)

306

.flatMap { (candidate, current) =>

307

if (candidate.componentId < current.componentId) {

308

Some(candidate)

309

} else {

310

None

311

}

312

}

313

314

val newSolutionSet = solutionSet

315

.leftOuterJoin(updates)

316

.where(_.id)

317

.equalTo(_.id)

318

.apply { (current, updateOpt) =>

319

Option(updateOpt).getOrElse(current)

320

}

321

322

(newSolutionSet, updates)

323

}

324

```

325

326

### K-Means Clustering

327

328

```scala

329

import org.apache.flink.api.scala._

330

331

case class Point2D(x: Double, y: Double)

332

case class Centroid(id: Int, x: Double, y: Double)

333

334

val env = ExecutionEnvironment.getExecutionEnvironment

335

336

// Data points

337

val points = env.fromElements(

338

Point2D(1.0, 1.0), Point2D(1.5, 1.5), Point2D(2.0, 2.0),

339

Point2D(8.0, 8.0), Point2D(8.5, 8.5), Point2D(9.0, 9.0)

340

)

341

342

// Initial centroids

343

val initialCentroids = env.fromElements(

344

Centroid(0, 0.0, 0.0),

345

Centroid(1, 5.0, 5.0)

346

)

347

348

// K-means iteration

349

val finalCentroids = initialCentroids.iterate(20) { centroids =>

350

351

// Assign each point to nearest centroid

352

val assignments = points

353

.map { point =>

354

// Find nearest centroid (simplified - would need to collect centroids)

355

// This is a simplified version for demonstration

356

val nearestCentroidId = 0 // Would calculate actual nearest

357

(nearestCentroidId, point, 1)

358

}

359

360

// Calculate new centroids

361

val newCentroids = assignments

362

.groupBy(_._1)

363

.reduceGroup { assignments =>

364

val assignmentList = assignments.toList

365

val centroidId = assignmentList.head._1

366

val pointSum = assignmentList.map(_._2).reduce { (p1, p2) =>

367

Point2D(p1.x + p2.x, p1.y + p2.y)

368

}

369

val count = assignmentList.length

370

Centroid(centroidId, pointSum.x / count, pointSum.y / count)

371

}

372

373

newCentroids

374

}

375

```

376

377

## Best Practices

378

379

### Performance Considerations

380

381

1. **Use Delta Iteration for Sparse Updates**: When only a small fraction of data changes per iteration

382

2. **Choose Appropriate Termination Conditions**: Balance accuracy with performance

383

3. **Optimize Broadcast Variables**: Use for small lookup tables in iterations

384

4. **Consider Checkpointing**: For long-running iterations to handle failures

385

386

### Common Patterns

387

388

1. **Graph Algorithms**: Use delta iteration with vertex-centric approach

389

2. **Machine Learning**: Use bulk iteration for gradient descent algorithms

390

3. **Convergence Detection**: Implement custom termination criteria based on convergence metrics

391

4. **State Management**: Use solution sets in delta iteration to maintain complete state