or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

graph-analytics.mdgraph-construction.mdgraph-operations.mdindex.mditerative-algorithms.md

index.mddocs/

0

# Apache Flink Gelly Scala API

1

2

## Overview

3

4

Apache Flink Gelly Scala is a graph processing library that provides a high-level Scala API for distributed graph analytics and algorithms on Apache Flink. It enables developers to build scalable graph processing applications using Scala's functional programming paradigms, with support for vertex-centric iterations, scatter-gather patterns, and a comprehensive set of graph operations.

5

6

## Package Information

7

8

- **Package Name**: flink-gelly-scala_2.10

9

- **Package Type**: maven

10

- **Language**: Scala 2.10

11

- **Installation**: Add to Maven dependency

12

13

## Installation

14

15

Add to your Maven `pom.xml`:

16

17

```xml

18

<dependency>

19

<groupId>org.apache.flink</groupId>

20

<artifactId>flink-gelly-scala_2.10</artifactId>

21

<version>1.3.3</version>

22

</dependency>

23

```

24

25

## Core Imports

26

27

```scala

28

import org.apache.flink.graph.scala._

29

import org.apache.flink.api.scala._

30

import org.apache.flink.graph.{Edge, Vertex}

31

```

32

33

## Basic Usage

34

35

```scala

36

import org.apache.flink.api.scala._

37

import org.apache.flink.graph.scala._

38

import org.apache.flink.graph.{Edge, Vertex}

39

40

// Set up execution environment

41

val env = ExecutionEnvironment.getExecutionEnvironment

42

43

// Create vertices and edges

44

val vertices = env.fromCollection(List(

45

new Vertex(1L, "A"),

46

new Vertex(2L, "B"),

47

new Vertex(3L, "C")

48

))

49

50

val edges = env.fromCollection(List(

51

new Edge(1L, 2L, 1.0),

52

new Edge(2L, 3L, 2.0)

53

))

54

55

// Create graph

56

val graph = Graph.fromDataSet(vertices, edges, env)

57

58

// Basic operations

59

val vertexCount = graph.numberOfVertices()

60

val degrees = graph.getDegrees()

61

```

62

63

## Architecture

64

65

The Flink Gelly Scala library consists of several key components:

66

67

- **Graph Class**: The main graph representation with type-safe operations

68

- **Graph Factory Methods**: Multiple ways to create graphs from various data sources

69

- **Graph Transformations**: Methods for mapping, filtering, and transforming graphs

70

- **Graph Analytics**: Built-in algorithms and metrics calculations

71

- **Iterative Processing**: Support for custom graph algorithms using different iteration patterns

72

73

## Core Types

74

75

```scala { .api }

76

// Main graph type with key (K), vertex value (VV), and edge value (EV) type parameters

77

final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]

78

79

// Core graph elements from Flink Gelly Java API

80

class Vertex[K, VV](id: K, value: VV)

81

class Edge[K, EV](source: K, target: K, value: EV)

82

class Triplet[K, VV, EV](srcVertexId: K, trgVertexId: K, srcVertexValue: VV, trgVertexValue: VV, edgeValue: EV)

83

```

84

85

## Graph Construction

86

87

Create graphs from various data sources including DataSets, collections, tuples, and CSV files.

88

89

### Basic Construction

90

91

```scala { .api }

92

// From DataSets

93

def fromDataSet[K, VV, EV](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV]

94

def fromDataSet[K, EV](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV]

95

def fromDataSet[K, VV, EV](edges: DataSet[Edge[K, EV]], vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV]

96

97

// From Collections

98

def fromCollection[K, VV, EV](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV]

99

def fromCollection[K, EV](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV]

100

```

101

102

### Tuple-based Construction

103

104

```scala { .api }

105

// From Tuples - convenient for creating graphs from structured data

106

def fromTupleDataSet[K, VV, EV](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, VV, EV]

107

def fromTupleDataSet[K, EV](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV]

108

def fromTuple2DataSet[K](edges: DataSet[(K, K)], env: ExecutionEnvironment): Graph[K, NullValue, NullValue]

109

110

// From CSV Files - extensive configuration for file-based graph creation

111

def fromCsvReader[K, VV, EV](env: ExecutionEnvironment, pathEdges: String, ...): Graph[K, VV, EV]

112

```

113

114

[Complete Graph Construction Documentation](./graph-construction.md)

115

116

## Graph Operations

117

118

Access graph data and perform basic transformations.

119

120

### Data Access

121

122

```scala { .api }

123

// Access graph components

124

def getVertices: DataSet[Vertex[K, VV]]

125

def getEdges: DataSet[Edge[K, EV]]

126

def getVerticesAsTuple2(): DataSet[(K, VV)]

127

def getEdgesAsTuple3(): DataSet[(K, K, EV)]

128

def getTriplets(): DataSet[Triplet[K, VV, EV]]

129

130

// Graph metrics

131

def numberOfVertices(): Long

132

def numberOfEdges(): Long

133

def getVertexIds(): DataSet[K]

134

def getEdgeIds(): DataSet[(K, K)]

135

```

136

137

### Transformations

138

139

```scala { .api }

140

// Map transformations

141

def mapVertices[NV: TypeInformation : ClassTag](mapper: MapFunction[Vertex[K, VV], NV]): Graph[K, NV, EV]

142

def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] => NV): Graph[K, NV, EV]

143

def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K, EV], NV]): Graph[K, VV, NV]

144

def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV): Graph[K, VV, NV]

145

146

// Translation operations

147

def translateGraphIds[NEW: TypeInformation : ClassTag](translator: TranslateFunction[K, NEW]): Graph[NEW, VV, EV]

148

def translateVertexValues[NEW: TypeInformation : ClassTag](translator: TranslateFunction[VV, NEW]): Graph[K, NEW, EV]

149

def translateEdgeValues[NEW: TypeInformation : ClassTag](translator: TranslateFunction[EV, NEW]): Graph[K, VV, NEW]

150

```

151

152

[Complete Graph Operations Documentation](./graph-operations.md)

153

154

## Graph Analytics

155

156

Perform analytics and computations on graph structure.

157

158

### Degree Calculations

159

160

```scala { .api }

161

// Degree metrics

162

def inDegrees(): DataSet[(K, LongValue)]

163

def outDegrees(): DataSet[(K, LongValue)]

164

def getDegrees(): DataSet[(K, LongValue)]

165

```

166

167

### Graph Structure Operations

168

169

```scala { .api }

170

// Structural transformations

171

def getUndirected(): Graph[K, VV, EV]

172

def reverse(): Graph[K, VV, EV]

173

def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]

174

def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]): Graph[K, VV, EV]

175

def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]

176

```

177

178

### Set Operations

179

180

```scala { .api }

181

// Graph set operations

182

def union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]

183

def difference(graph: Graph[K, VV, EV]): Graph[K, VV, EV]

184

def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV]

185

186

// Graph validation

187

def validate(validator: GraphValidator[K, VV, EV]): Boolean

188

```

189

190

[Complete Graph Analytics Documentation](./graph-analytics.md)

191

192

## Iterative Algorithms

193

194

Support for implementing custom graph algorithms using different iteration patterns.

195

196

### Iteration Types

197

198

```scala { .api }

199

// Scatter-Gather iterations

200

def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV], gatherFunction: GatherFunction[K, VV, M], maxIterations: Int): Graph[K, VV, EV]

201

202

// Gather-Sum-Apply iterations

203

def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction: SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int): Graph[K, VV, EV]

204

205

// Vertex-centric iterations (Pregel-style)

206

def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M], combineFunction: MessageCombiner[K, M], maxIterations: Int): Graph[K, VV, EV]

207

208

// Algorithm framework integration

209

def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T

210

def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]

211

```

212

213

[Complete Iterative Algorithms Documentation](./iterative-algorithms.md)

214

215

## Built-in Graph Algorithms

216

217

Access to the comprehensive Gelly algorithm library through the Scala API.

218

219

### Pre-implemented Algorithms

220

221

```scala { .api }

222

// Algorithm execution

223

def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T

224

def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]

225

```

226

227

**Available Algorithms:**

228

- **PageRank** - Computes the PageRank scores for graph vertices

229

- **Connected Components** - Labels vertices by connected component membership

230

- **Single Source Shortest Paths** - Computes shortest paths from a source vertex

231

- **Triangle Enumeration** - Enumerates all triangles in the graph

232

- **HITS Algorithm** - Computes hub and authority scores

233

- **Clustering Coefficients** - Measures clustering for vertices

234

- **Community Detection** - Identifies communities using label propagation

235

- **Graph Metrics** - Various centrality and structural measures

236

237

**Usage Examples:**

238

239

```scala

240

import org.apache.flink.graph.library.{PageRank, ConnectedComponents, SingleSourceShortestPaths}

241

242

// PageRank with damping factor and maximum iterations

243

val pageRankResult = graph.run(new PageRank[Long](dampingFactor = 0.85, maxIterations = 10))

244

245

// Connected Components - finds connected subgraphs

246

val components = graph.run(new ConnectedComponents[Long, String, Double](maxIterations = 10))

247

248

// Single Source Shortest Paths from vertex with ID 1L

249

val shortestPaths = graph.run(new SingleSourceShortestPaths[Long, Double](srcVertexId = 1L, maxIterations = 10))

250

```

251

252

## Neighborhood Operations

253

254

Operations for working with vertex neighborhoods and performing aggregations.

255

256

```scala { .api }

257

// Edge-based aggregations

258

def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T], direction: EdgeDirection): DataSet[T]

259

def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

260

261

// Neighbor-based aggregations

262

def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunction[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

263

def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

264

265

// Reduction operations

266

def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction: EdgeDirection): DataSet[(K, VV)]

267

def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): DataSet[(K, EV)]

268

```

269

270

## Graph Mutations

271

272

Methods for adding and removing vertices and edges.

273

274

```scala { .api }

275

// Adding elements

276

def addVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]

277

def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV]

278

def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV]

279

def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV): Graph[K, VV, EV]

280

281

// Removing elements

282

def removeVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]

283

def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV]

284

def removeEdge(edge: Edge[K, EV]): Graph[K, VV, EV]

285

def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV]

286

```

287

288

## Join Operations

289

290

Join graph data with external datasets to enrich vertex and edge information.

291

292

```scala { .api }

293

// Vertex joins

294

def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV]

295

def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV): Graph[K, VV, EV]

296

297

// Edge joins

298

def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]

299

def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV): Graph[K, VV, EV]

300

def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]

301

def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]

302

```