0
# Flink Gelly Scala
1
2
Flink Gelly Scala provides idiomatic Scala APIs for Apache Flink's Gelly graph processing library. It offers type-safe, functional programming support for large-scale graph processing with comprehensive operations including graph transformations, iterative algorithms, and graph analytics, all integrated with Flink's distributed processing engine.
3
4
## Package Information
5
6
- **Package Name**: flink-gelly-scala_2.11
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-gelly-scala_2.11</artifactId>
14
<version>1.14.6</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```scala
21
import org.apache.flink.graph.scala._
22
import org.apache.flink.graph.{Edge, Vertex}
23
import org.apache.flink.api.scala._
24
```
25
26
## Basic Usage
27
28
```scala
29
import org.apache.flink.graph.scala._
30
import org.apache.flink.graph.{Edge, Vertex}
31
import org.apache.flink.api.scala._
32
33
// Create execution environment
34
val env = ExecutionEnvironment.getExecutionEnvironment
35
36
// Create vertices and edges
37
val vertices = env.fromCollection(Seq(
38
new Vertex(1L, "Alice"),
39
new Vertex(2L, "Bob"),
40
new Vertex(3L, "Charlie")
41
))
42
43
val edges = env.fromCollection(Seq(
44
new Edge(1L, 2L, 0.5),
45
new Edge(2L, 3L, 0.3),
46
new Edge(1L, 3L, 0.8)
47
))
48
49
// Create graph
50
val graph = Graph.fromDataSet(vertices, edges, env)
51
52
// Perform basic operations
53
val degrees = graph.getDegrees()
54
val filteredGraph = graph.filterOnVertices(_.getValue.length > 3)
55
val mappedGraph = graph.mapVertices(v => v.getValue.toUpperCase)
56
```
57
58
## Architecture
59
60
Flink Gelly Scala is built around several key components:
61
62
- **Graph Class**: Main graph representation with type parameters K (key), VV (vertex value), EV (edge value)
63
- **Factory Methods**: Multiple ways to create graphs from DataSets, Collections, CSV files, and tuples
64
- **Transformations**: Type-safe operations for mapping, filtering, and manipulating graph structure
65
- **Iterative Algorithms**: Support for scatter-gather, gather-sum-apply, and vertex-centric computation models
66
- **Analytics**: Built-in graph metrics and custom reduction operations
67
- **Integration**: Seamless interoperability with Flink DataSets and the broader Flink ecosystem
68
69
## Capabilities
70
71
### Graph Creation and Management
72
73
Core functionality for creating, modifying, and managing graph structures with various data sources and formats.
74
75
```scala { .api }
76
// Factory methods in Graph companion object
77
object Graph {
78
def fromDataSet[K, VV, EV](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]],
79
env: ExecutionEnvironment): Graph[K, VV, EV]
80
def fromCollection[K, VV, EV](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]],
81
env: ExecutionEnvironment): Graph[K, VV, EV]
82
def fromTupleDataSet[K, VV, EV](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
83
env: ExecutionEnvironment): Graph[K, VV, EV]
84
}
85
86
// Graph modification methods
87
class Graph[K, VV, EV] {
88
def addVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
89
def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV): Graph[K, VV, EV]
90
def removeVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
91
def union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
92
}
93
```
94
95
[Graph Creation and Management](./graph-creation.md)
96
97
### Graph Transformations
98
99
Type-safe transformation operations for mapping vertex and edge values, filtering, and structural modifications.
100
101
```scala { .api }
102
class Graph[K, VV, EV] {
103
def mapVertices[NV](fun: Vertex[K, VV] => NV): Graph[K, NV, EV]
104
def mapEdges[NV](fun: Edge[K, EV] => NV): Graph[K, VV, NV]
105
def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean,
106
edgeFilterFun: Edge[K, EV] => Boolean): Graph[K, VV, EV]
107
def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean): Graph[K, VV, EV]
108
def translateGraphIds[NEW](fun: (K, NEW) => NEW): Graph[NEW, VV, EV]
109
}
110
```
111
112
[Graph Transformations](./graph-transformations.md)
113
114
### Graph Algorithms
115
116
Iterative computation models for implementing sophisticated graph algorithms using scatter-gather, gather-sum-apply, and vertex-centric paradigms.
117
118
```scala { .api }
119
class Graph[K, VV, EV] {
120
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],
121
gatherFunction: GatherFunction[K, VV, M],
122
maxIterations: Int): Graph[K, VV, EV]
123
124
def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M],
125
sumFunction: SumFunction[VV, EV, M],
126
applyFunction: ApplyFunction[K, VV, M],
127
maxIterations: Int): Graph[K, VV, EV]
128
129
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M],
130
combineFunction: MessageCombiner[K, M],
131
maxIterations: Int): Graph[K, VV, EV]
132
}
133
```
134
135
[Graph Algorithms](./graph-algorithms.md)
136
137
### Graph Analytics
138
139
Built-in graph metrics, degree calculations, neighborhood operations, and custom reduction functions for graph analysis.
140
141
```scala { .api }
142
class Graph[K, VV, EV] {
143
def getDegrees(): DataSet[(K, LongValue)]
144
def inDegrees(): DataSet[(K, LongValue)]
145
def outDegrees(): DataSet[(K, LongValue)]
146
def numberOfVertices(): Long
147
def numberOfEdges(): Long
148
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV],
149
direction: EdgeDirection): DataSet[(K, VV)]
150
def groupReduceOnNeighbors[T](neighborsFunction: NeighborsFunction[K, VV, EV, T],
151
direction: EdgeDirection): DataSet[T]
152
}
153
```
154
155
[Graph Analytics](./graph-analytics.md)
156
157
### User-Defined Functions
158
159
Abstract base classes for implementing custom graph processing functions with access to vertex values, edges, and neighborhoods.
160
161
```scala { .api }
162
abstract class EdgesFunction[K, EV, T] {
163
def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T]): Unit
164
}
165
166
abstract class NeighborsFunction[K, VV, EV, T] {
167
def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])],
168
out: Collector[T]): Unit
169
}
170
171
abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] {
172
def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T]): Unit
173
}
174
```
175
176
[User-Defined Functions](./user-defined-functions.md)
177
178
### Data Integration
179
180
Operations for joining graphs with external datasets and converting between different data representations.
181
182
```scala { .api }
183
class Graph[K, VV, EV] {
184
def joinWithVertices[T](inputDataSet: DataSet[(K, T)],
185
fun: (VV, T) => VV): Graph[K, VV, EV]
186
def joinWithEdges[T](inputDataSet: DataSet[(K, K, T)],
187
fun: (EV, T) => EV): Graph[K, VV, EV]
188
def getVerticesAsTuple2(): DataSet[(K, VV)]
189
def getEdgesAsTuple3(): DataSet[(K, K, EV)]
190
def getTriplets(): DataSet[Triplet[K, VV, EV]]
191
}
192
```
193
194
[Data Integration](./data-integration.md)
195
196
## Types
197
198
```scala { .api }
199
class Graph[K, VV, EV]
200
201
// Core graph elements (from Java Gelly)
202
class Vertex[K, VV](id: K, value: VV)
203
class Edge[K, EV](source: K, target: K, value: EV)
204
class Triplet[K, VV, EV] // Represents (srcVertex, edge, trgVertex)
205
206
// Direction enumeration
207
object EdgeDirection {
208
val IN: EdgeDirection
209
val OUT: EdgeDirection
210
val ALL: EdgeDirection
211
}
212
213
// Utility mappers
214
class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]]
215
class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]]
216
class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)]
217
class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)]
218
```