or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-integration.mdgraph-algorithms.mdgraph-analytics.mdgraph-creation.mdgraph-transformations.mdindex.mduser-defined-functions.md

graph-algorithms.mddocs/

0

# Graph Algorithms

1

2

Iterative computation models for implementing sophisticated graph algorithms using scatter-gather, gather-sum-apply, and vertex-centric paradigms, along with algorithm execution framework.

3

4

## Capabilities

5

6

### Algorithm Execution Framework

7

8

Execute pre-built algorithms and analytics on graphs with type-safe result handling.

9

10

```scala { .api }

11

/**

12

* Run a graph algorithm on the graph.

13

* @param algorithm the algorithm to run on the Graph

14

* @return the result of the graph algorithm

15

*/

16

def run[T](algorithm: GraphAlgorithm[K, VV, EV, T]): T

17

18

/**

19

* Run a graph analytic on the graph.

20

* A GraphAnalytic is similar to a GraphAlgorithm but is terminal and results

21

* are retrieved via accumulators. A Flink program has a single point of

22

* execution. A GraphAnalytic defers execution to the user to allow composing

23

* multiple analytics and algorithms into a single program.

24

* @param analytic the analytic to run on the Graph

25

*/

26

def run[T](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]

27

```

28

29

### Scatter-Gather Iterations

30

31

Vertex-centric iterative computation model where vertices scatter messages along edges and gather messages from neighbors.

32

33

```scala { .api }

34

/**

35

* Runs a scatter-gather iteration on the graph.

36

* No configuration options are provided.

37

* @param scatterFunction the scatter function

38

* @param gatherFunction the gather function

39

* @param maxIterations maximum number of iterations to perform

40

* @return the updated Graph after the scatter-gather iteration has converged or

41

* after maximumNumberOfIterations.

42

*/

43

def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],

44

gatherFunction: GatherFunction[K, VV, M],

45

maxIterations: Int): Graph[K, VV, EV]

46

47

/**

48

* Runs a scatter-gather iteration on the graph with configuration options.

49

* @param scatterFunction the scatter function

50

* @param gatherFunction the gather function

51

* @param maxIterations maximum number of iterations to perform

52

* @param parameters the iteration configuration parameters

53

* @return the updated Graph after the scatter-gather iteration has converged or

54

* after maximumNumberOfIterations.

55

*/

56

def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],

57

gatherFunction: GatherFunction[K, VV, M],

58

maxIterations: Int,

59

parameters: ScatterGatherConfiguration): Graph[K, VV, EV]

60

```

61

62

### Gather-Sum-Apply Iterations

63

64

Three-phase iterative computation model for graph algorithms that need to collect neighborhood information, aggregate it, and update vertex values.

65

66

```scala { .api }

67

/**

68

* Runs a Gather-Sum-Apply iteration on the graph.

69

* No configuration options are provided.

70

* @param gatherFunction the gather function collects information about adjacent

71

* vertices and edges

72

* @param sumFunction the sum function aggregates the gathered information

73

* @param applyFunction the apply function updates the vertex values with the aggregates

74

* @param maxIterations maximum number of iterations to perform

75

* @tparam M the intermediate type used between gather, sum and apply

76

* @return the updated Graph after the gather-sum-apply iteration has converged or

77

* after maximumNumberOfIterations.

78

*/

79

def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M],

80

sumFunction: SumFunction[VV, EV, M],

81

applyFunction: ApplyFunction[K, VV, M],

82

maxIterations: Int): Graph[K, VV, EV]

83

84

/**

85

* Runs a Gather-Sum-Apply iteration on the graph with configuration options.

86

* @param gatherFunction the gather function collects information about adjacent

87

* vertices and edges

88

* @param sumFunction the sum function aggregates the gathered information

89

* @param applyFunction the apply function updates the vertex values with the aggregates

90

* @param maxIterations maximum number of iterations to perform

91

* @param parameters the iteration configuration parameters

92

* @tparam M the intermediate type used between gather, sum and apply

93

* @return the updated Graph after the gather-sum-apply iteration has converged or

94

* after maximumNumberOfIterations.

95

*/

96

def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M],

97

sumFunction: SumFunction[VV, EV, M],

98

applyFunction: ApplyFunction[K, VV, M],

99

maxIterations: Int,

100

parameters: GSAConfiguration): Graph[K, VV, EV]

101

```

102

103

### Vertex-Centric Iterations

104

105

Pregel-style iterative computation model where vertices receive messages, perform computations, and send messages to neighbors.

106

107

```scala { .api }

108

/**

109

* Runs a vertex-centric iteration on the graph.

110

* No configuration options are provided.

111

* @param computeFunction the compute function

112

* @param combineFunction the optional message combiner function

113

* @param maxIterations maximum number of iterations to perform

114

* @return the updated Graph after the vertex-centric iteration has converged or

115

* after maximumNumberOfIterations.

116

*/

117

def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M],

118

combineFunction: MessageCombiner[K, M],

119

maxIterations: Int): Graph[K, VV, EV]

120

121

/**

122

* Runs a vertex-centric iteration on the graph with configuration options.

123

* @param computeFunction the compute function

124

* @param combineFunction the optional message combiner function

125

* @param maxIterations maximum number of iterations to perform

126

* @param parameters the iteration configuration parameters

127

* @return the updated Graph after the vertex-centric iteration has converged or

128

* after maximumNumberOfIterations.

129

*/

130

def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M],

131

combineFunction: MessageCombiner[K, M],

132

maxIterations: Int,

133

parameters: VertexCentricConfiguration): Graph[K, VV, EV]

134

```

135

136

### Graph Validation

137

138

Validate graph properties and constraints using custom validators.

139

140

```scala { .api }

141

/**

142

* Validate the graph using a GraphValidator.

143

* @param validator the validator to apply

144

* @return true if the graph is valid according to the validator, false otherwise

145

*/

146

def validate(validator: GraphValidator[K, VV, EV]): Boolean

147

```

148

149

## Algorithm Types and Patterns

150

151

### Function Interfaces (From Java Gelly)

152

153

The iteration methods use these key function interfaces from the underlying Java Gelly library:

154

155

```scala { .api }

156

// Scatter-Gather Functions

157

trait ScatterFunction[K, VV, M, EV] {

158

def sendMessages(vertex: Vertex[K, VV]): Unit

159

}

160

161

trait GatherFunction[K, VV, M] {

162

def updateVertex(vertex: Vertex[K, VV], inMessages: MessageIterator[M]): VV

163

}

164

165

// Gather-Sum-Apply Functions

166

trait GSAGatherFunction[VV, EV, M] {

167

def gather(neighbor: Neighbor[VV, EV]): M

168

}

169

170

trait SumFunction[VV, EV, M] {

171

def sum(arg0: M, arg1: M): M

172

}

173

174

trait ApplyFunction[K, VV, M] {

175

def apply(newValue: M, currentValue: VV): VV

176

}

177

178

// Vertex-Centric Functions

179

trait ComputeFunction[K, VV, EV, M] {

180

def compute(vertex: Vertex[K, VV], messages: MessageIterator[M]): Unit

181

}

182

183

trait MessageCombiner[K, M] {

184

def combineMessages(arg0: M, arg1: M): M

185

}

186

187

// Validation

188

trait GraphValidator[K, VV, EV] {

189

def validate(graph: Graph[K, VV, EV]): Boolean

190

}

191

```

192

193

### Configuration Types

194

195

Algorithm behavior can be customized using configuration objects:

196

197

```scala { .api }

198

// Configuration classes (from Java Gelly)

199

class ScatterGatherConfiguration

200

class GSAConfiguration

201

class VertexCentricConfiguration

202

```

203

204

**Usage Examples:**

205

206

```scala

207

import org.apache.flink.graph.scala._

208

import org.apache.flink.graph._

209

import org.apache.flink.graph.spargel._

210

import org.apache.flink.graph.gsa._

211

import org.apache.flink.graph.pregel._

212

213

// Example: Single Source Shortest Path using Scatter-Gather

214

class SSSPScatter[K](sourceId: K) extends ScatterFunction[K, Double, Double, Double] {

215

override def sendMessages(vertex: Vertex[K, Double]): Unit = {

216

if (vertex.getId == sourceId || vertex.getValue < Double.MaxValue) {

217

for (edge <- getEdges) {

218

sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)

219

}

220

}

221

}

222

}

223

224

class SSSPGather extends GatherFunction[K, Double, Double] {

225

override def updateVertex(vertex: Vertex[K, Double],

226

inMessages: MessageIterator[Double]): Double = {

227

var minDistance = vertex.getValue

228

while (inMessages.hasNext) {

229

val msg = inMessages.next()

230

if (msg < minDistance) {

231

minDistance = msg

232

}

233

}

234

minDistance

235

}

236

}

237

238

// Run SSSP algorithm

239

val sourceId = 1L

240

val scatterFunction = new SSSPScatter(sourceId)

241

val gatherFunction = new SSSPGather()

242

val result = graph.runScatterGatherIteration(scatterFunction, gatherFunction, 10)

243

244

// Example: PageRank using Gather-Sum-Apply

245

class PageRankGather extends GSAGatherFunction[Double, Double, Double] {

246

override def gather(neighbor: Neighbor[Double, Double]): Double = {

247

neighbor.getNeighborVertex.getValue / neighbor.getNeighborVertex.getOutDegree

248

}

249

}

250

251

class PageRankSum extends SumFunction[Double, Double, Double] {

252

override def sum(arg0: Double, arg1: Double): Double = arg0 + arg1

253

}

254

255

class PageRankApply(dampingFactor: Double) extends ApplyFunction[K, Double, Double] {

256

override def apply(newValue: Double, currentValue: Double): Double = {

257

(1.0 - dampingFactor) + dampingFactor * newValue

258

}

259

}

260

261

// Run PageRank algorithm

262

val gatherFunc = new PageRankGather()

263

val sumFunc = new PageRankSum()

264

val applyFunc = new PageRankApply(0.85)

265

val pageRankResult = graph.runGatherSumApplyIteration(gatherFunc, sumFunc, applyFunc, 10)

266

267

// Example: Using built-in algorithms

268

import org.apache.flink.graph.library._

269

270

val ssspAlgorithm = new SingleSourceShortestPaths[K, Double](sourceId, 10)

271

val distances = graph.run(ssspAlgorithm)

272

273

val pageRankAlgorithm = new PageRank[K, Double, Double](0.85, 10)

274

val pageRankValues = graph.run(pageRankAlgorithm)

275

```

276

277

### Algorithm Design Patterns

278

279

#### Scatter-Gather Pattern

280

Best for algorithms where:

281

- Vertices need to send information to their neighbors

282

- Each vertex processes messages from its neighbors independently

283

- Examples: Single Source Shortest Path, Connected Components

284

285

#### Gather-Sum-Apply Pattern

286

Best for algorithms where:

287

- You need to collect and aggregate information from neighbors

288

- The aggregation can be expressed as an associative operation

289

- Examples: PageRank, Triangle Counting

290

291

#### Vertex-Centric Pattern

292

Best for algorithms where:

293

- Vertices need fine-grained control over message sending

294

- Complex message processing is required

295

- Examples: Graph Coloring, Community Detection

296

297

### Performance Considerations

298

299

- **Convergence**: All iteration methods support both maximum iteration limits and convergence criteria

300

- **Configuration**: Use configuration objects to tune performance (memory, networking, convergence)

301

- **Message Types**: Choose efficient message types (M) to minimize serialization overhead

302

- **Partitioning**: Consider graph partitioning strategies for large graphs

303

- **Checkpointing**: Enable checkpointing for long-running iterative algorithms