or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

algorithms.mdanalytics.mddata-access.mdgenerators.mdgraph-creation.mdindex.mditerative-processing.md

iterative-processing.mddocs/

0

# Iterative Processing Models

1

2

Gelly provides three distributed iterative computation models for implementing graph algorithms: Vertex-centric (Pregel), Scatter-Gather, and Gather-Sum-Apply. Each model provides different abstractions for message-passing graph computations on distributed Flink clusters.

3

4

## Capabilities

5

6

### Vertex-Centric (Pregel) Model

7

8

The vertex-centric model follows Google's Pregel programming paradigm where computation is performed at each vertex by processing incoming messages and sending messages to neighbors.

9

10

```java { .api }

11

public <M> Graph<K, VV, EV> runVertexCentricIteration(

12

ComputeFunction<K, VV, EV, M> computeFunction,

13

MessageCombiner<K, M> combiner,

14

int maxIterations)

15

16

public <M> Graph<K, VV, EV> runVertexCentricIteration(

17

ComputeFunction<K, VV, EV, M> computeFunction,

18

MessageCombiner<K, M> combiner,

19

int maxIterations,

20

VertexCentricConfiguration parameters)

21

```

22

23

#### ComputeFunction<K, VV, EV, M>

24

25

Defines the computation performed at each vertex in each iteration.

26

27

```java { .api }

28

public abstract class ComputeFunction<K, VV, EV, M> implements Serializable {

29

public abstract void compute(

30

Vertex<K, VV> vertex,

31

MessageIterator<M> messages) throws Exception;

32

33

// Message sending methods

34

public void sendMessageToAllNeighbors(M message)

35

public void sendMessageTo(K target, M message)

36

public Iterable<Edge<K, EV>> getEdges()

37

38

// Vertex value update

39

public void setNewVertexValue(VV newValue)

40

41

// Utility methods

42

public long getNumberOfVertices()

43

public int getSuperstepNumber()

44

public <T> Aggregator<T> getIterationAggregator(String name)

45

public void preSuperstep() throws Exception

46

public void postSuperstep() throws Exception

47

}

48

```

49

50

#### MessageCombiner<K, M>

51

52

Optional combiner to reduce message traffic by combining messages sent to the same vertex.

53

54

```java { .api }

55

public interface MessageCombiner<K, M> extends java.io.Serializable {

56

void combineMessages(MessageIterator<M> messages, Collector<M> out) throws Exception;

57

}

58

```

59

60

#### MessageIterator<M>

61

62

Iterator for processing incoming messages at a vertex.

63

64

```java { .api }

65

public interface MessageIterator<M> extends Iterator<M> {

66

boolean hasNext();

67

M next();

68

}

69

```

70

71

#### VertexCentricConfiguration

72

73

Configuration options for vertex-centric iterations.

74

75

```java { .api }

76

public class VertexCentricConfiguration {

77

public VertexCentricConfiguration setName(String name)

78

public VertexCentricConfiguration setParallelism(int parallelism)

79

public VertexCentricConfiguration setSolutionSetUnmanagedMemory(boolean unmanaged)

80

public VertexCentricConfiguration registerAggregator(String name, Aggregator<?> aggregator)

81

}

82

```

83

84

**Usage Example:**

85

86

```java

87

// Single Source Shortest Path using Vertex-Centric iteration

88

public class SSSpComputeFunction extends ComputeFunction<Long, Double, Double, Double> {

89

@Override

90

public void compute(Vertex<Long, Double> vertex,

91

MessageIterator<Double> messages) throws Exception {

92

93

double minDistance = (vertex.getId().equals(1L)) ? 0.0 : Double.POSITIVE_INFINITY;

94

95

// Update distance with minimum from incoming messages

96

while (messages.hasNext()) {

97

minDistance = Math.min(minDistance, messages.next());

98

}

99

100

// If distance changed, propagate to neighbors

101

if (minDistance < vertex.getValue()) {

102

setNewVertexValue(minDistance);

103

// Send updated distance + edge weight to all neighbors

104

for (Edge<Long, Double> edge : getEdges()) {

105

sendMessageTo(edge.getTarget(), minDistance + edge.getValue());

106

}

107

}

108

}

109

}

110

111

// Run the algorithm

112

Graph<Long, Double, Double> result = graph.runVertexCentricIteration(

113

new SSSpComputeFunction(),

114

new MinMessageCombiner(),

115

maxIterations

116

);

117

```

118

119

### Scatter-Gather Model

120

121

The scatter-gather model separates message sending (scatter) and vertex update (gather) into distinct phases.

122

123

```java { .api }

124

public <M> Graph<K, VV, EV> runScatterGatherIteration(

125

ScatterFunction<K, VV, M, EV> scatterFunction,

126

GatherFunction<K, VV, M> gatherFunction,

127

int maxIterations)

128

129

public <M> Graph<K, VV, EV> runScatterGatherIteration(

130

ScatterFunction<K, VV, M, EV> scatterFunction,

131

GatherFunction<K, VV, M> gatherFunction,

132

int maxIterations,

133

ScatterGatherConfiguration parameters)

134

```

135

136

#### ScatterFunction<K, VV, M, EV>

137

138

Defines how messages are sent to neighbors in the scatter phase.

139

140

```java { .api }

141

public abstract class ScatterFunction<K, VV, M, EV> extends AbstractRichFunction {

142

public abstract void sendMessages(Vertex<K, VV> vertex, Collector<Tuple2<K, M>> out) throws Exception;

143

144

// Access to outgoing edges

145

public Iterable<Edge<K, EV>> getEdges()

146

147

// Utility methods

148

public long getNumberOfVertices()

149

public int getSuperstepNumber()

150

public <T> Aggregator<T> getIterationAggregator(String name)

151

public void preSuperstep() throws Exception

152

public void postSuperstep() throws Exception

153

}

154

```

155

156

#### GatherFunction<K, VV, M>

157

158

Defines how vertex values are updated based on incoming messages in the gather phase.

159

160

```java { .api }

161

public abstract class GatherFunction<K, VV, M> extends AbstractRichFunction {

162

public abstract VV updateVertex(Vertex<K, VV> vertex, MessageIterator<M> inMessages) throws Exception;

163

164

// Utility methods

165

public long getNumberOfVertices()

166

public int getSuperstepNumber()

167

public <T> Aggregator<T> getIterationAggregator(String name)

168

public void preSuperstep() throws Exception

169

public void postSuperstep() throws Exception

170

}

171

```

172

173

#### ScatterGatherConfiguration

174

175

Configuration options for scatter-gather iterations.

176

177

```java { .api }

178

public class ScatterGatherConfiguration {

179

public ScatterGatherConfiguration setName(String name)

180

public ScatterGatherConfiguration setParallelism(int parallelism)

181

public ScatterGatherConfiguration setSolutionSetUnmanagedMemory(boolean unmanaged)

182

public ScatterGatherConfiguration registerAggregator(String name, Aggregator<?> aggregator)

183

public ScatterGatherConfiguration setOptDegrees(boolean optDegrees)

184

}

185

```

186

187

**Usage Example:**

188

189

```java

190

// PageRank using Scatter-Gather

191

public class PageRankScatter extends ScatterFunction<Long, Double, Double, NullValue> {

192

@Override

193

public void sendMessages(Vertex<Long, Double> vertex, Collector<Tuple2<Long, Double>> out) {

194

int degree = 0;

195

for (Edge<Long, NullValue> edge : getEdges()) {

196

degree++;

197

}

198

199

double rankToSend = vertex.getValue() / degree;

200

for (Edge<Long, NullValue> edge : getEdges()) {

201

out.collect(new Tuple2<>(edge.getTarget(), rankToSend));

202

}

203

}

204

}

205

206

public class PageRankGather extends GatherFunction<Long, Double, Double> {

207

private final double dampingFactor = 0.85;

208

209

@Override

210

public Double updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) {

211

double sum = 0.0;

212

while (inMessages.hasNext()) {

213

sum += inMessages.next();

214

}

215

return (1.0 - dampingFactor) / getNumberOfVertices() + dampingFactor * sum;

216

}

217

}

218

219

// Run PageRank

220

Graph<Long, Double, NullValue> result = graph.runScatterGatherIteration(

221

new PageRankScatter(),

222

new PageRankGather(),

223

maxIterations

224

);

225

```

226

227

### Gather-Sum-Apply (GSA) Model

228

229

The GSA model provides three distinct phases: gather information from neighbors, sum/aggregate the gathered information, and apply the result to update vertex values.

230

231

```java { .api }

232

public <M> Graph<K, VV, EV> runGatherSumApplyIteration(

233

GatherFunction<VV, EV, M> gatherFunction,

234

SumFunction<VV, EV, M> sumFunction,

235

ApplyFunction<K, VV, M> applyFunction,

236

int maxIterations)

237

238

public <M> Graph<K, VV, EV> runGatherSumApplyIteration(

239

GatherFunction<VV, EV, M> gatherFunction,

240

SumFunction<VV, EV, M> sumFunction,

241

ApplyFunction<K, VV, M> applyFunction,

242

int maxIterations,

243

GSAConfiguration parameters)

244

```

245

246

#### GatherFunction<VV, EV, M>

247

248

Gathers information from each neighbor edge.

249

250

```java { .api }

251

public abstract class GatherFunction<VV, EV, M> extends AbstractRichFunction {

252

public abstract M gather(Neighbor<VV, EV> neighbor) throws Exception;

253

254

// Utility methods

255

public long getNumberOfVertices()

256

public int getSuperstepNumber()

257

public <T> Aggregator<T> getIterationAggregator(String name)

258

public void preSuperstep() throws Exception

259

public void postSuperstep() throws Exception

260

}

261

```

262

263

#### SumFunction<VV, EV, M>

264

265

Aggregates all gathered values for a vertex.

266

267

```java { .api }

268

public abstract class SumFunction<VV, EV, M> extends AbstractRichFunction {

269

public abstract M sum(M value1, M value2) throws Exception;

270

}

271

```

272

273

#### ApplyFunction<K, VV, M>

274

275

Applies the aggregated result to update the vertex value.

276

277

```java { .api }

278

public abstract class ApplyFunction<K, VV, M> extends AbstractRichFunction {

279

public abstract VV apply(VV currentValue, M sum) throws Exception;

280

281

// Utility methods

282

public long getNumberOfVertices()

283

public int getSuperstepNumber()

284

public <T> Aggregator<T> getIterationAggregator(String name)

285

public void preSuperstep() throws Exception

286

public void postSuperstep() throws Exception

287

}

288

```

289

290

#### Neighbor<VV, EV>

291

292

Represents a neighbor vertex and connecting edge in GSA iterations.

293

294

```java { .api }

295

public class Neighbor<VV, EV> {

296

public VV getNeighborValue()

297

public EV getEdgeValue()

298

}

299

```

300

301

#### GSAConfiguration

302

303

Configuration options for GSA iterations.

304

305

```java { .api }

306

public class GSAConfiguration {

307

public GSAConfiguration setName(String name)

308

public GSAConfiguration setParallelism(int parallelism)

309

public GSAConfiguration setSolutionSetUnmanagedMemory(boolean unmanaged)

310

public GSAConfiguration registerAggregator(String name, Aggregator<?> aggregator)

311

public GSAConfiguration setOptDegrees(boolean optDegrees)

312

}

313

```

314

315

**Usage Example:**

316

317

```java

318

// Connected Components using GSA

319

public class CCGather extends GatherFunction<Long, NullValue, Long> {

320

@Override

321

public Long gather(Neighbor<Long, NullValue> neighbor) {

322

return neighbor.getNeighborValue();

323

}

324

}

325

326

public class CCSum extends SumFunction<Long, NullValue, Long> {

327

@Override

328

public Long sum(Long value1, Long value2) {

329

return Math.min(value1, value2);

330

}

331

}

332

333

public class CCApply extends ApplyFunction<Long, Long, Long> {

334

@Override

335

public Long apply(Long currentValue, Long sum) {

336

return Math.min(currentValue, sum);

337

}

338

}

339

340

// Run Connected Components

341

Graph<Long, Long, NullValue> result = graph.runGatherSumApplyIteration(

342

new CCGather(),

343

new CCSum(),

344

new CCApply(),

345

maxIterations

346

);

347

```

348

349

## Common Patterns

350

351

### Convergence Detection

352

353

All iteration models support convergence detection through aggregators:

354

355

```java

356

// In compute/scatter/gather/apply functions

357

LongSumAggregator changedVertices = getIterationAggregator("changed");

358

if (valueChanged) {

359

changedVertices.aggregate(1L);

360

}

361

362

// Check convergence in configuration

363

configuration.registerAggregator("changed", new LongSumAggregator());

364

```

365

366

### Performance Optimization

367

368

- **Message Combiners**: Reduce network traffic in vertex-centric model

369

- **Degree Optimization**: Enable `setOptDegrees(true)` for degree-based algorithms

370

- **Memory Management**: Configure solution set memory for large graphs

371

- **Parallelism**: Set appropriate parallelism for iteration phases

372

373

### Error Handling

374

375

All user-defined functions can throw exceptions that will be propagated and cause job failure:

376

377

```java

378

@Override

379

public void compute(Vertex<Long, Double> vertex, MessageIterator<Double> messages, Collector<Double> out)

380

throws Exception {

381

382

if (vertex.getValue() < 0) {

383

throw new IllegalArgumentException("Negative vertex value: " + vertex.getValue());

384

}

385

// ... computation logic

386

}

387

```