Scala API for Apache Flink's Gelly graph processing library providing distributed graph operations and algorithms
npx @tessl/cli install tessl/maven-org-apache-flink--flink-gelly-scala_2-10@1.3.00
# Apache Flink Gelly Scala API
1
2
## Overview
3
4
Apache Flink Gelly Scala is a graph processing library that provides a high-level Scala API for distributed graph analytics and algorithms on Apache Flink. It enables developers to build scalable graph processing applications using Scala's functional programming paradigms, with support for vertex-centric iterations, scatter-gather patterns, and a comprehensive set of graph operations.
5
6
## Package Information
7
8
- **Package Name**: flink-gelly-scala_2.10
9
- **Package Type**: maven
10
- **Language**: Scala 2.10
11
- **Installation**: Add to Maven dependency
12
13
## Installation
14
15
Add to your Maven `pom.xml`:
16
17
```xml
18
<dependency>
19
<groupId>org.apache.flink</groupId>
20
<artifactId>flink-gelly-scala_2.10</artifactId>
21
<version>1.3.3</version>
22
</dependency>
23
```
24
25
## Core Imports
26
27
```scala
28
import org.apache.flink.graph.scala._
29
import org.apache.flink.api.scala._
30
import org.apache.flink.graph.{Edge, Vertex}
31
```
32
33
## Basic Usage
34
35
```scala
36
import org.apache.flink.api.scala._
37
import org.apache.flink.graph.scala._
38
import org.apache.flink.graph.{Edge, Vertex}
39
40
// Set up execution environment
41
val env = ExecutionEnvironment.getExecutionEnvironment
42
43
// Create vertices and edges
44
val vertices = env.fromCollection(List(
45
new Vertex(1L, "A"),
46
new Vertex(2L, "B"),
47
new Vertex(3L, "C")
48
))
49
50
val edges = env.fromCollection(List(
51
new Edge(1L, 2L, 1.0),
52
new Edge(2L, 3L, 2.0)
53
))
54
55
// Create graph
56
val graph = Graph.fromDataSet(vertices, edges, env)
57
58
// Basic operations
59
val vertexCount = graph.numberOfVertices()
60
val degrees = graph.getDegrees()
61
```
62
63
## Architecture
64
65
The Flink Gelly Scala library consists of several key components:
66
67
- **Graph Class**: The main graph representation with type-safe operations
68
- **Graph Factory Methods**: Multiple ways to create graphs from various data sources
69
- **Graph Transformations**: Methods for mapping, filtering, and transforming graphs
70
- **Graph Analytics**: Built-in algorithms and metrics calculations
71
- **Iterative Processing**: Support for custom graph algorithms using different iteration patterns
72
73
## Core Types
74
75
```scala { .api }
76
// Main graph type with key (K), vertex value (VV), and edge value (EV) type parameters
77
final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
78
79
// Core graph elements from Flink Gelly Java API
80
class Vertex[K, VV](id: K, value: VV)
81
class Edge[K, EV](source: K, target: K, value: EV)
82
class Triplet[K, VV, EV](srcVertexId: K, trgVertexId: K, srcVertexValue: VV, trgVertexValue: VV, edgeValue: EV)
83
```
84
85
## Graph Construction
86
87
Create graphs from various data sources including DataSets, collections, tuples, and CSV files.
88
89
### Basic Construction
90
91
```scala { .api }
92
// From DataSets
93
def fromDataSet[K, VV, EV](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV]
94
def fromDataSet[K, EV](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV]
95
def fromDataSet[K, VV, EV](edges: DataSet[Edge[K, EV]], vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV]
96
97
// From Collections
98
def fromCollection[K, VV, EV](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV]
99
def fromCollection[K, EV](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV]
100
```
101
102
### Tuple-based Construction
103
104
```scala { .api }
105
// From Tuples - convenient for creating graphs from structured data
106
def fromTupleDataSet[K, VV, EV](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, VV, EV]
107
def fromTupleDataSet[K, EV](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV]
108
def fromTuple2DataSet[K](edges: DataSet[(K, K)], env: ExecutionEnvironment): Graph[K, NullValue, NullValue]
109
110
// From CSV Files - extensive configuration for file-based graph creation
111
def fromCsvReader[K, VV, EV](env: ExecutionEnvironment, pathEdges: String, ...): Graph[K, VV, EV]
112
```
113
114
[Complete Graph Construction Documentation](./graph-construction.md)
115
116
## Graph Operations
117
118
Access graph data and perform basic transformations.
119
120
### Data Access
121
122
```scala { .api }
123
// Access graph components
124
def getVertices: DataSet[Vertex[K, VV]]
125
def getEdges: DataSet[Edge[K, EV]]
126
def getVerticesAsTuple2(): DataSet[(K, VV)]
127
def getEdgesAsTuple3(): DataSet[(K, K, EV)]
128
def getTriplets(): DataSet[Triplet[K, VV, EV]]
129
130
// Graph metrics
131
def numberOfVertices(): Long
132
def numberOfEdges(): Long
133
def getVertexIds(): DataSet[K]
134
def getEdgeIds(): DataSet[(K, K)]
135
```
136
137
### Transformations
138
139
```scala { .api }
140
// Map transformations
141
def mapVertices[NV: TypeInformation : ClassTag](mapper: MapFunction[Vertex[K, VV], NV]): Graph[K, NV, EV]
142
def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] => NV): Graph[K, NV, EV]
143
def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K, EV], NV]): Graph[K, VV, NV]
144
def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV): Graph[K, VV, NV]
145
146
// Translation operations
147
def translateGraphIds[NEW: TypeInformation : ClassTag](translator: TranslateFunction[K, NEW]): Graph[NEW, VV, EV]
148
def translateVertexValues[NEW: TypeInformation : ClassTag](translator: TranslateFunction[VV, NEW]): Graph[K, NEW, EV]
149
def translateEdgeValues[NEW: TypeInformation : ClassTag](translator: TranslateFunction[EV, NEW]): Graph[K, VV, NEW]
150
```
151
152
[Complete Graph Operations Documentation](./graph-operations.md)
153
154
## Graph Analytics
155
156
Perform analytics and computations on graph structure.
157
158
### Degree Calculations
159
160
```scala { .api }
161
// Degree metrics
162
def inDegrees(): DataSet[(K, LongValue)]
163
def outDegrees(): DataSet[(K, LongValue)]
164
def getDegrees(): DataSet[(K, LongValue)]
165
```
166
167
### Graph Structure Operations
168
169
```scala { .api }
170
// Structural transformations
171
def getUndirected(): Graph[K, VV, EV]
172
def reverse(): Graph[K, VV, EV]
173
def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]
174
def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]): Graph[K, VV, EV]
175
def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]
176
```
177
178
### Set Operations
179
180
```scala { .api }
181
// Graph set operations
182
def union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
183
def difference(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
184
def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV]
185
186
// Graph validation
187
def validate(validator: GraphValidator[K, VV, EV]): Boolean
188
```
189
190
[Complete Graph Analytics Documentation](./graph-analytics.md)
191
192
## Iterative Algorithms
193
194
Support for implementing custom graph algorithms using different iteration patterns.
195
196
### Iteration Types
197
198
```scala { .api }
199
// Scatter-Gather iterations
200
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV], gatherFunction: GatherFunction[K, VV, M], maxIterations: Int): Graph[K, VV, EV]
201
202
// Gather-Sum-Apply iterations
203
def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction: SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int): Graph[K, VV, EV]
204
205
// Vertex-centric iterations (Pregel-style)
206
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M], combineFunction: MessageCombiner[K, M], maxIterations: Int): Graph[K, VV, EV]
207
208
// Algorithm framework integration
209
def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T
210
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]
211
```
212
213
[Complete Iterative Algorithms Documentation](./iterative-algorithms.md)
214
215
## Built-in Graph Algorithms
216
217
Access to the comprehensive Gelly algorithm library through the Scala API.
218
219
### Pre-implemented Algorithms
220
221
```scala { .api }
222
// Algorithm execution
223
def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T
224
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]
225
```
226
227
**Available Algorithms:**
228
- **PageRank** - Computes the PageRank scores for graph vertices
229
- **Connected Components** - Labels vertices by connected component membership
230
- **Single Source Shortest Paths** - Computes shortest paths from a source vertex
231
- **Triangle Enumeration** - Enumerates all triangles in the graph
232
- **HITS Algorithm** - Computes hub and authority scores
233
- **Clustering Coefficients** - Measures clustering for vertices
234
- **Community Detection** - Identifies communities using label propagation
235
- **Graph Metrics** - Various centrality and structural measures
236
237
**Usage Examples:**
238
239
```scala
240
import org.apache.flink.graph.library.{PageRank, ConnectedComponents, SingleSourceShortestPaths}
241
242
// PageRank with damping factor and maximum iterations
243
val pageRankResult = graph.run(new PageRank[Long](dampingFactor = 0.85, maxIterations = 10))
244
245
// Connected Components - finds connected subgraphs
246
val components = graph.run(new ConnectedComponents[Long, String, Double](maxIterations = 10))
247
248
// Single Source Shortest Paths from vertex with ID 1L
249
val shortestPaths = graph.run(new SingleSourceShortestPaths[Long, Double](srcVertexId = 1L, maxIterations = 10))
250
```
251
252
## Neighborhood Operations
253
254
Operations for working with vertex neighborhoods and performing aggregations.
255
256
```scala { .api }
257
// Edge-based aggregations
258
def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T], direction: EdgeDirection): DataSet[T]
259
def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]
260
261
// Neighbor-based aggregations
262
def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunction[K, VV, EV, T], direction: EdgeDirection): DataSet[T]
263
def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]
264
265
// Reduction operations
266
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction: EdgeDirection): DataSet[(K, VV)]
267
def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): DataSet[(K, EV)]
268
```
269
270
## Graph Mutations
271
272
Methods for adding and removing vertices and edges.
273
274
```scala { .api }
275
// Adding elements
276
def addVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
277
def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV]
278
def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV]
279
def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV): Graph[K, VV, EV]
280
281
// Removing elements
282
def removeVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
283
def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV]
284
def removeEdge(edge: Edge[K, EV]): Graph[K, VV, EV]
285
def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV]
286
```
287
288
## Join Operations
289
290
Join graph data with external datasets to enrich vertex and edge information.
291
292
```scala { .api }
293
// Vertex joins
294
def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV]
295
def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV): Graph[K, VV, EV]
296
297
// Edge joins
298
def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
299
def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV): Graph[K, VV, EV]
300
def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
301
def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
302
```