or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

aggregation-grouping.mddata-input-output.mddataset-operations.mdexecution-environments.mdindex.mditeration-operations.mdjoin-cogroup-operations.mdutility-functions.md

iteration-operations.mddocs/

0

# Iteration Operations

1

2

Support for iterative algorithms including bulk iterations and delta iterations. These operations enable implementation of machine learning algorithms, graph processing, and other iterative computations that require multiple passes over the data.

3

4

## Capabilities

5

6

### Bulk Iterations

7

8

Simple bulk iterations where the entire dataset is processed in each iteration until a maximum number of iterations is reached or a termination criterion is met.

9

10

```java { .api }

11

/**

12

* Create a bulk iteration

13

* @param maxIterations maximum number of iterations

14

* @return IterativeDataSet for iteration configuration

15

*/

16

public IterativeDataSet<T> iterate(int maxIterations);

17

```

18

19

### IterativeDataSet Operations

20

21

Operations available on iterative datasets for implementing bulk iteration logic.

22

23

```java { .api }

24

/**

25

* Close the iteration with the iteration result

26

* @param iterationResult the result of each iteration step

27

* @return DataSet with the final iteration result

28

*/

29

public DataSet<T> closeWith(DataSet<T> iterationResult);

30

31

/**

32

* Close the iteration with result and termination criterion

33

* @param iterationResult the result of each iteration step

34

* @param terminationCriterion dataset that determines when to terminate

35

* @return DataSet with the final iteration result

36

*/

37

public DataSet<T> closeWith(DataSet<T> iterationResult, DataSet<T> terminationCriterion);

38

```

39

40

**Usage Examples:**

41

42

```java

43

// Simple bulk iteration - compute powers of 2

44

DataSet<Integer> initial = env.fromElements(1);

45

46

DataSet<Integer> result = initial

47

.iterate(10) // max 10 iterations

48

.map(new MapFunction<Integer, Integer>() {

49

@Override

50

public Integer map(Integer value) {

51

return value * 2;

52

}

53

})

54

.closeWith(); // Close without termination criterion

55

56

// Bulk iteration with termination criterion

57

DataSet<Double> initialValues = env.fromElements(1.0, 2.0, 3.0);

58

59

IterativeDataSet<Double> iteration = initialValues.iterate(100);

60

61

DataSet<Double> nextValues = iteration

62

.map(new MapFunction<Double, Double>() {

63

@Override

64

public Double map(Double value) {

65

return Math.sqrt(value);

66

}

67

});

68

69

// Termination criterion: stop when values are close to 1.0

70

DataSet<Double> terminationCriterion = nextValues

71

.filter(new FilterFunction<Double>() {

72

@Override

73

public boolean filter(Double value) {

74

return Math.abs(value - 1.0) > 0.001;

75

}

76

});

77

78

DataSet<Double> finalResult = iteration.closeWith(nextValues, terminationCriterion);

79

```

80

81

### Delta Iterations

82

83

More efficient iterations for scenarios where only a small part of the data changes in each iteration. Delta iterations maintain a solution set (complete state) and a workset (elements to be processed).

84

85

```java { .api }

86

/**

87

* Create a delta iteration

88

* @param workset initial workset (elements to process)

89

* @param maxIterations maximum number of iterations

90

* @param keyFields key fields for joining solution set and workset

91

* @return DeltaIteration for delta iteration configuration

92

*/

93

public DeltaIteration<T, ?> iterateDelta(DataSet<?> workset, int maxIterations, int... keyFields);

94

95

/**

96

* Create a delta iteration with key selector

97

* @param workset initial workset (elements to process)

98

* @param maxIterations maximum number of iterations

99

* @param keyExtractor function to extract keys for joining

100

* @return DeltaIteration for delta iteration configuration

101

*/

102

public <K> DeltaIteration<T, ?> iterateDelta(DataSet<?> workset, int maxIterations, KeySelector<T, K> keyExtractor);

103

```

104

105

### DeltaIteration Operations

106

107

Operations for configuring delta iterations with solution set updates and workset generation.

108

109

```java { .api }

110

/**

111

* Delta iteration class for solution set and workset management

112

* @param <ST> solution set element type

113

* @param <WT> workset element type

114

*/

115

public class DeltaIteration<ST, WT> {

116

117

/**

118

* Get the solution set placeholder for join operations

119

* @return SolutionSetPlaceHolder representing the solution set

120

*/

121

public SolutionSetPlaceHolder<ST> getSolutionSet();

122

123

/**

124

* Get the workset placeholder for transformations

125

* @return WorksetPlaceHolder representing the workset

126

*/

127

public WorksetPlaceHolder<WT> getWorkset();

128

129

/**

130

* Close the delta iteration

131

* @param solutionSetDelta updates to the solution set

132

* @param newWorkset new workset for next iteration

133

* @return DataSet with final solution set

134

*/

135

public DataSet<ST> closeWith(DataSet<ST> solutionSetDelta, DataSet<WT> newWorkset);

136

}

137

```

138

139

**Usage Examples:**

140

141

```java

142

// Delta iteration example: Single Source Shortest Paths

143

DataSet<Tuple2<Long, Double>> vertices = env.fromElements(

144

new Tuple2<>(1L, 0.0), // source vertex with distance 0

145

new Tuple2<>(2L, Double.POSITIVE_INFINITY),

146

new Tuple2<>(3L, Double.POSITIVE_INFINITY)

147

);

148

149

DataSet<Tuple2<Long, Long>> edges = env.fromElements(

150

new Tuple2<>(1L, 2L), // edge from vertex 1 to vertex 2

151

new Tuple2<>(2L, 3L) // edge from vertex 2 to vertex 3

152

);

153

154

// Initial workset contains only the source vertex

155

DataSet<Tuple2<Long, Double>> initialWorkset = vertices

156

.filter(new FilterFunction<Tuple2<Long, Double>>() {

157

@Override

158

public boolean filter(Tuple2<Long, Double> vertex) {

159

return vertex.f1 == 0.0; // source vertex

160

}

161

});

162

163

// Create delta iteration

164

DeltaIteration<Tuple2<Long, Double>, Tuple2<Long, Double>> iteration =

165

vertices.iterateDelta(initialWorkset, 100, 0); // group by vertex ID (field 0)

166

167

// Get current solution set and workset placeholders

168

SolutionSetPlaceHolder<Tuple2<Long, Double>> solutionSet = iteration.getSolutionSet();

169

WorksetPlaceHolder<Tuple2<Long, Double>> workset = iteration.getWorkset();

170

171

// Join workset with edges to find neighbors

172

DataSet<Tuple3<Long, Long, Double>> candidateUpdates = workset

173

.join(edges)

174

.where(0).equalTo(0) // join on source vertex ID

175

.with(new JoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long>, Tuple3<Long, Long, Double>>() {

176

@Override

177

public Tuple3<Long, Long, Double> join(

178

Tuple2<Long, Double> vertex,

179

Tuple2<Long, Long> edge) {

180

return new Tuple3<>(edge.f1, vertex.f0, vertex.f1 + 1.0); // neighbor, source, new distance

181

}

182

});

183

184

// Generate solution set updates (shorter paths found)

185

DataSet<Tuple2<Long, Double>> solutionSetUpdates = candidateUpdates

186

.join(solutionSet)

187

.where(0).equalTo(0) // join on target vertex ID

188

.with(new JoinFunction<Tuple3<Long, Long, Double>, Tuple2<Long, Double>, Tuple2<Long, Double>>() {

189

@Override

190

public Tuple2<Long, Double> join(

191

Tuple3<Long, Long, Double> candidate,

192

Tuple2<Long, Double> current) {

193

return candidate.f2 < current.f1 ?

194

new Tuple2<>(candidate.f0, candidate.f2) : null; // shorter path found

195

}

196

})

197

.filter(new FilterFunction<Tuple2<Long, Double>>() {

198

@Override

199

public boolean filter(Tuple2<Long, Double> value) {

200

return value != null;

201

}

202

});

203

204

// New workset contains vertices with updated distances

205

DataSet<Tuple2<Long, Double>> newWorkset = solutionSetUpdates;

206

207

// Close the iteration

208

DataSet<Tuple2<Long, Double>> shortestPaths = iteration.closeWith(solutionSetUpdates, newWorkset);

209

```

210

211

### DeltaIterationResultSet

212

213

Result container for delta iterations that provides access to both final solution set and iteration statistics.

214

215

```java { .api }

216

/**

217

* Result set from delta iteration containing solution set and metadata

218

* @param <ST> solution set element type

219

* @param <WT> workset element type

220

*/

221

public class DeltaIterationResultSet<ST, WT> {

222

223

/**

224

* Get the final solution set

225

* @return DataSet with final solution set

226

*/

227

public DataSet<ST> getFinalResult();

228

229

/**

230

* Get iteration statistics and metadata

231

* @return iteration execution information

232

*/

233

public IterationHead getIterationHead();

234

}

235

```

236

237

### Iteration Configuration

238

239

Additional configuration options for iterations.

240

241

```java { .api }

242

/**

243

* Set custom name for the iteration

244

* @param name name for the iteration

245

* @return configured iteration

246

*/

247

public IterativeDataSet<T> name(String name);

248

249

/**

250

* Set parallelism for the iteration

251

* @param parallelism parallelism level

252

* @return configured iteration

253

*/

254

public IterativeDataSet<T> parallelism(int parallelism);

255

```

256

257

### Termination Criteria

258

259

Methods for defining when iterations should terminate.

260

261

```java { .api }

262

/**

263

* Register aggregator for iteration

264

* @param name name of the aggregator

265

* @param aggregator the aggregator function

266

* @return configured iteration

267

*/

268

public <X> DeltaIteration<ST, WT> registerAggregator(String name, Aggregator<X> aggregator);

269

270

/**

271

* Register convergence criterion with aggregator

272

* @param name name of the convergence aggregator

273

* @param aggregator the aggregator function

274

* @param convergenceCheck convergence criterion implementation

275

* @return configured iteration

276

*/

277

public <X> DeltaIteration<ST, WT> registerAggregationConvergenceCriterion(

278

String name,

279

Aggregator<X> aggregator,

280

ConvergenceCriterion<X> convergenceCheck);

281

```

282

283

**Advanced Usage Examples:**

284

285

```java

286

// Iteration with broadcast variable and aggregator

287

DataSet<Double> parameters = env.fromElements(0.1, 0.2, 0.3);

288

289

DataSet<Double> result = initialData

290

.iterate(50)

291

.name("Gradient Descent")

292

.withBroadcastSet(parameters, "parameters")

293

.registerAggregator("convergence", new DoubleSumAggregator())

294

.map(new RichMapFunction<Double, Double>() {

295

private List<Double> params;

296

private DoubleSumAggregator convergenceAgg;

297

298

@Override

299

public void open(Configuration config) {

300

params = getRuntimeContext().getBroadcastVariable("parameters");

301

convergenceAgg = getIterationRuntimeContext().getIterationAggregator("convergence");

302

}

303

304

@Override

305

public Double map(Double value) {

306

double newValue = value * params.get(0); // use broadcast parameter

307

convergenceAgg.aggregate(Math.abs(newValue - value)); // track convergence

308

return newValue;

309

}

310

})

311

.closeWith();

312

```

313

314

## Types

315

316

```java { .api }

317

import org.apache.flink.api.java.operators.IterativeDataSet;

318

import org.apache.flink.api.java.operators.DeltaIteration;

319

import org.apache.flink.api.java.operators.DeltaIteration.SolutionSetPlaceHolder;

320

import org.apache.flink.api.java.operators.DeltaIteration.WorksetPlaceHolder;

321

import org.apache.flink.api.java.operators.DeltaIterationResultSet;

322

import org.apache.flink.api.common.aggregators.Aggregator;

323

import org.apache.flink.api.common.aggregators.ConvergenceCriterion;

324

import org.apache.flink.api.common.functions.RichMapFunction;

325

import org.apache.flink.configuration.Configuration;

326

import org.apache.flink.util.Collector;

327

```