or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

graph-analytics.mdgraph-construction.mdgraph-operations.mdindex.mditerative-algorithms.md

iterative-algorithms.mddocs/

0

# Iterative Algorithms

1

2

Complete API for implementing custom graph algorithms using different iteration patterns and the algorithm framework.

3

4

## Iteration Patterns Overview

5

6

Flink Gelly Scala provides three main iteration patterns for implementing graph algorithms:

7

8

1. **Scatter-Gather** - Vertices scatter messages along edges and gather/apply updates

9

2. **Gather-Sum-Apply (GSA)** - Three-phase pattern for vertex-centric computations

10

3. **Vertex-Centric (Pregel)** - Pregel-style vertex-centric iterations with message passing

11

12

## Scatter-Gather Iterations

13

14

### Basic Scatter-Gather

15

16

```scala { .api }

17

def runScatterGatherIteration[M](

18

scatterFunction: ScatterFunction[K, VV, M, EV],

19

gatherFunction: GatherFunction[K, VV, M],

20

maxIterations: Int

21

): Graph[K, VV, EV]

22

```

23

24

Runs a scatter-gather iteration on the graph without configuration options.

25

26

**Parameters:**

27

- `scatterFunction` - Function that scatters messages from vertices to neighbors

28

- `gatherFunction` - Function that gathers messages and updates vertex values

29

- `maxIterations` - Maximum number of iterations to perform

30

31

**Returns:** Updated graph after convergence or maximum iterations

32

33

### Configured Scatter-Gather

34

35

```scala { .api }

36

def runScatterGatherIteration[M](

37

scatterFunction: ScatterFunction[K, VV, M, EV],

38

gatherFunction: GatherFunction[K, VV, M],

39

maxIterations: Int,

40

parameters: ScatterGatherConfiguration

41

): Graph[K, VV, EV]

42

```

43

44

Runs scatter-gather iteration with configuration options.

45

46

**Parameters:**

47

- `scatterFunction` - Message scattering function

48

- `gatherFunction` - Message gathering and vertex update function

49

- `maxIterations` - Maximum iterations

50

- `parameters` - Iteration configuration parameters

51

52

## Gather-Sum-Apply Iterations

53

54

### Basic GSA

55

56

```scala { .api }

57

def runGatherSumApplyIteration[M](

58

gatherFunction: GatherFunction[VV, EV, M],

59

sumFunction: SumFunction[VV, EV, M],

60

applyFunction: ApplyFunction[K, VV, M],

61

maxIterations: Int

62

): Graph[K, VV, EV]

63

```

64

65

Runs a Gather-Sum-Apply iteration without configuration options.

66

67

**Parameters:**

68

- `gatherFunction` - Collects information about adjacent vertices and edges

69

- `sumFunction` - Aggregates the gathered information

70

- `applyFunction` - Updates vertex values with the aggregated data

71

- `maxIterations` - Maximum number of iterations

72

73

### Configured GSA

74

75

```scala { .api }

76

def runGatherSumApplyIteration[M](

77

gatherFunction: GatherFunction[VV, EV, M],

78

sumFunction: SumFunction[VV, EV, M],

79

applyFunction: ApplyFunction[K, VV, M],

80

maxIterations: Int,

81

parameters: GSAConfiguration

82

): Graph[K, VV, EV]

83

```

84

85

Runs GSA iteration with configuration parameters.

86

87

**Parameters:**

88

- `gatherFunction` - Information gathering function

89

- `sumFunction` - Aggregation function

90

- `applyFunction` - Vertex update function

91

- `maxIterations` - Maximum iterations

92

- `parameters` - GSA configuration parameters

93

94

## Vertex-Centric Iterations (Pregel)

95

96

### Basic Vertex-Centric

97

98

```scala { .api }

99

def runVertexCentricIteration[M](

100

computeFunction: ComputeFunction[K, VV, EV, M],

101

combineFunction: MessageCombiner[K, M],

102

maxIterations: Int

103

): Graph[K, VV, EV]

104

```

105

106

Runs a vertex-centric iteration without configuration options.

107

108

**Parameters:**

109

- `computeFunction` - Vertex compute function that processes messages and updates values

110

- `combineFunction` - Optional message combiner function

111

- `maxIterations` - Maximum number of iterations

112

113

### Configured Vertex-Centric

114

115

```scala { .api }

116

def runVertexCentricIteration[M](

117

computeFunction: ComputeFunction[K, VV, EV, M],

118

combineFunction: MessageCombiner[K, M],

119

maxIterations: Int,

120

parameters: VertexCentricConfiguration

121

): Graph[K, VV, EV]

122

```

123

124

Runs vertex-centric iteration with configuration parameters.

125

126

**Parameters:**

127

- `computeFunction` - Vertex computation function

128

- `combineFunction` - Message combiner for reducing messages

129

- `maxIterations` - Maximum iterations

130

- `parameters` - Vertex-centric configuration parameters

131

132

## Algorithm Framework Integration

133

134

### Algorithm Execution

135

136

```scala { .api }

137

def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T

138

```

139

140

Executes a graph algorithm that implements the `GraphAlgorithm` interface.

141

142

**Parameters:**

143

- `algorithm` - Graph algorithm implementation

144

145

**Returns:** Algorithm result of type `T`

146

147

### Analytics Execution

148

149

```scala { .api }

150

def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]

151

```

152

153

Executes a graph analytic. Analytics are terminal operations whose results are retrieved via accumulators, allowing composition of multiple analytics and algorithms into a single program.

154

155

**Parameters:**

156

- `analytic` - Graph analytic implementation

157

158

**Returns:** The analytic instance for result retrieval

159

160

## Function Interfaces

161

162

### Scatter Function

163

164

```scala { .api }

165

abstract class ScatterFunction[K, VV, M, EV] {

166

def sendMessages(vertex: Vertex[K, VV]): Unit

167

def sendMessageTo(targetVertexId: K, message: M): Unit

168

def getOutEdges: Iterable[Edge[K, EV]]

169

def getInEdges: Iterable[Edge[K, EV]]

170

}

171

```

172

173

Base class for scatter functions that send messages from vertices to their neighbors.

174

175

### Gather Function (Scatter-Gather)

176

177

```scala { .api }

178

abstract class GatherFunction[K, VV, M] {

179

def gather(messages: Iterable[M]): VV

180

def getSuperstepNumber: Int

181

}

182

```

183

184

Base class for gather functions in scatter-gather iterations.

185

186

### Gather Function (GSA)

187

188

```scala { .api }

189

abstract class GatherFunction[VV, EV, M] {

190

def gather(neighborVertex: Vertex[_, VV], edge: Edge[_, EV]): M

191

}

192

```

193

194

Base class for gather functions in GSA iterations.

195

196

### Sum Function

197

198

```scala { .api }

199

abstract class SumFunction[VV, EV, M] {

200

def sum(newValue: M, currentValue: M): M

201

}

202

```

203

204

Base class for sum functions that aggregate gathered information in GSA iterations.

205

206

### Apply Function

207

208

```scala { .api }

209

abstract class ApplyFunction[K, VV, M] {

210

def apply(newValue: M, currentValue: VV): VV

211

def getSuperstepNumber: Int

212

}

213

```

214

215

Base class for apply functions that update vertex values in GSA iterations.

216

217

### Compute Function

218

219

```scala { .api }

220

abstract class ComputeFunction[K, VV, EV, M] {

221

def compute(vertex: Vertex[K, VV], messages: Iterable[M]): Unit

222

def sendMessageTo(targetVertexId: K, message: M): Unit

223

def setNewVertexValue(newValue: VV): Unit

224

def getSuperstepNumber: Int

225

def getOutEdges: Iterable[Edge[K, EV]]

226

def getTotalNumVertices: Long

227

}

228

```

229

230

Base class for compute functions in vertex-centric iterations.

231

232

### Message Combiner

233

234

```scala { .api }

235

abstract class MessageCombiner[K, M] {

236

def combineMessages(messages: Iterable[M]): M

237

}

238

```

239

240

Base class for combining multiple messages sent to the same vertex.

241

242

## Configuration Classes

243

244

### ScatterGatherConfiguration

245

246

```scala { .api }

247

class ScatterGatherConfiguration {

248

def setName(name: String): ScatterGatherConfiguration

249

def setParallelism(parallelism: Int): ScatterGatherConfiguration

250

def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): ScatterGatherConfiguration

251

def setOptNumVertices(numVertices: Long): ScatterGatherConfiguration

252

}

253

```

254

255

Configuration for scatter-gather iterations.

256

257

### GSAConfiguration

258

259

```scala { .api }

260

class GSAConfiguration {

261

def setName(name: String): GSAConfiguration

262

def setParallelism(parallelism: Int): GSAConfiguration

263

def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): GSAConfiguration

264

def setOptNumVertices(numVertices: Long): GSAConfiguration

265

}

266

```

267

268

Configuration for gather-sum-apply iterations.

269

270

### VertexCentricConfiguration

271

272

```scala { .api }

273

class VertexCentricConfiguration {

274

def setName(name: String): VertexCentricConfiguration

275

def setParallelism(parallelism: Int): VertexCentricConfiguration

276

def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): VertexCentricConfiguration

277

def setOptNumVertices(numVertices: Long): VertexCentricConfiguration

278

}

279

```

280

281

Configuration for vertex-centric iterations.

282

283

## Usage Examples

284

285

### Single Source Shortest Path (Scatter-Gather)

286

287

```scala

288

import org.apache.flink.graph.spargel.{ScatterFunction, GatherFunction}

289

290

// Scatter function sends current distance + edge weight to neighbors

291

class DistanceScatter extends ScatterFunction[Long, Double, Double, Double] {

292

override def sendMessages(vertex: Vertex[Long, Double]): Unit = {

293

if (vertex.getValue < Double.MaxValue) {

294

for (edge <- getOutEdges) {

295

sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)

296

}

297

}

298

}

299

}

300

301

// Gather function takes minimum of received distances

302

class DistanceGather extends GatherFunction[Long, Double, Double] {

303

override def gather(messages: Iterable[Double]): Double = {

304

var min = Double.MaxValue

305

for (msg <- messages) {

306

min = Math.min(min, msg)

307

}

308

min

309

}

310

}

311

312

// Run algorithm

313

val shortestPaths = graph.runScatterGatherIteration(

314

new DistanceScatter(),

315

new DistanceGather(),

316

maxIterations = 10

317

)

318

```

319

320

### PageRank (Vertex-Centric)

321

322

```scala

323

import org.apache.flink.graph.pregel.ComputeFunction

324

import org.apache.flink.graph.{Vertex, Edge}

325

326

class PageRankCompute(maxIterations: Int) extends ComputeFunction[Long, Double, Double, Double] {

327

override def compute(vertex: Vertex[Long, Double], messages: Iterable[Double]): Unit = {

328

val dampingFactor = 0.85

329

val numVertices = getTotalNumVertices

330

331

var sum = 0.0

332

for (msg <- messages) {

333

sum += msg

334

}

335

336

val newValue = (1.0 - dampingFactor) / numVertices + dampingFactor * sum

337

setNewVertexValue(newValue)

338

339

// Send messages to neighbors

340

if (getSuperstepNumber < maxIterations - 1) {

341

val outDegree = getOutEdges.size

342

if (outDegree > 0) {

343

val msgValue = newValue / outDegree

344

for (edge <- getOutEdges) {

345

sendMessageTo(edge.getTarget, msgValue)

346

}

347

}

348

}

349

}

350

}

351

352

// Run PageRank

353

val maxIterations = 10

354

val pageRankResult = graph.runVertexCentricIteration(

355

new PageRankCompute(maxIterations),

356

null, // No message combiner

357

maxIterations

358

)

359

```

360

361

### Connected Components (GSA)

362

363

```scala

364

import org.apache.flink.graph.gsa.{GatherFunction, SumFunction, ApplyFunction}

365

366

class ComponentIdGather extends GatherFunction[Long, NullValue, Long] {

367

override def gather(neighborVertex: Vertex[_, Long], edge: Edge[_, NullValue]): Long = {

368

neighborVertex.getValue

369

}

370

}

371

372

class ComponentIdSum extends SumFunction[Long, NullValue, Long] {

373

override def sum(newValue: Long, currentValue: Long): Long = {

374

Math.min(newValue, currentValue)

375

}

376

}

377

378

class ComponentIdApply extends ApplyFunction[Long, Long, Long] {

379

override def apply(newValue: Long, currentValue: Long): Long = {

380

Math.min(newValue, currentValue)

381

}

382

}

383

384

// Run connected components

385

val components = graph.runGatherSumApplyIteration(

386

new ComponentIdGather(),

387

new ComponentIdSum(),

388

new ComponentIdApply(),

389

maxIterations = 10

390

)

391

```

392

393

### Using Built-in Algorithms

394

395

```scala

396

import org.apache.flink.graph.library.PageRank

397

398

// Use pre-implemented PageRank algorithm

399

val pageRankAlgorithm = new PageRank[Long](dampingFactor = 0.85, maxIterations = 10)

400

val pageRankResult = graph.run(pageRankAlgorithm)

401

402

// The result is a DataSet[(Long, Double)] containing vertex IDs and PageRank scores

403

val topVertices = pageRankResult.first(10)

404

```