0
# Graph Transformations
1
2
Type-safe operations for transforming vertex and edge values, filtering graph elements, and modifying graph structure while preserving the distributed processing benefits of Flink.
3
4
## Capabilities
5
6
### Vertex Transformations
7
8
Apply functions to vertex values while preserving graph topology.
9
10
```scala { .api }
11
/**
12
* Apply a function to the attribute of each vertex in the graph.
13
* @param mapper the map function to apply.
14
* @return a new graph
15
*/
16
def mapVertices[NV](mapper: MapFunction[Vertex[K, VV], NV]): Graph[K, NV, EV]
17
18
/**
19
* Apply a function to the attribute of each vertex in the graph.
20
* @param fun the map function to apply.
21
* @return a new graph
22
*/
23
def mapVertices[NV](fun: Vertex[K, VV] => NV): Graph[K, NV, EV]
24
```
25
26
### Edge Transformations
27
28
Apply functions to edge values while preserving graph connectivity.
29
30
```scala { .api }
31
/**
32
* Apply a function to the attribute of each edge in the graph.
33
* @param mapper the map function to apply.
34
* @return a new graph
35
*/
36
def mapEdges[NV](mapper: MapFunction[Edge[K, EV], NV]): Graph[K, VV, NV]
37
38
/**
39
* Apply a function to the attribute of each edge in the graph.
40
* @param fun the map function to apply.
41
* @return a new graph
42
*/
43
def mapEdges[NV](fun: Edge[K, EV] => NV): Graph[K, VV, NV]
44
```
45
46
### ID and Value Translation
47
48
Transform vertex and edge identifiers or values using translation functions.
49
50
```scala { .api }
51
/**
52
* Translate vertex and edge IDs using the given TranslateFunction.
53
* @param translator implements conversion from K to NEW
54
* @return graph with translated vertex and edge IDs
55
*/
56
def translateGraphIds[NEW](translator: TranslateFunction[K, NEW]): Graph[NEW, VV, EV]
57
58
/**
59
* Translate vertex and edge IDs using the given function.
60
* @param fun implements conversion from K to NEW
61
* @return graph with translated vertex and edge IDs
62
*/
63
def translateGraphIds[NEW](fun: (K, NEW) => NEW): Graph[NEW, VV, EV]
64
65
/**
66
* Translate vertex values using the given TranslateFunction.
67
* @param translator implements conversion from VV to NEW
68
* @return graph with translated vertex values
69
*/
70
def translateVertexValues[NEW](translator: TranslateFunction[VV, NEW]): Graph[K, NEW, EV]
71
72
/**
73
* Translate vertex values using the given function.
74
* @param fun implements conversion from VV to NEW
75
* @return graph with translated vertex values
76
*/
77
def translateVertexValues[NEW](fun: (VV, NEW) => NEW): Graph[K, NEW, EV]
78
79
/**
80
* Translate edge values using the given TranslateFunction.
81
* @param translator implements conversion from EV to NEW
82
* @return graph with translated edge values
83
*/
84
def translateEdgeValues[NEW](translator: TranslateFunction[EV, NEW]): Graph[K, VV, NEW]
85
86
/**
87
* Translate edge values using the given function.
88
* @param fun implements conversion from EV to NEW
89
* @return graph with translated edge values
90
*/
91
def translateEdgeValues[NEW](fun: (EV, NEW) => NEW): Graph[K, VV, NEW]
92
```
93
94
### Graph Filtering
95
96
Create subgraphs by applying filter predicates to vertices and edges.
97
98
```scala { .api }
99
/**
100
* Apply filtering functions to the graph and return a sub-graph that
101
* satisfies the predicates for both vertices and edges.
102
* @param vertexFilter the filter function for vertices.
103
* @param edgeFilter the filter function for edges.
104
* @return the resulting sub-graph.
105
*/
106
def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]],
107
edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]
108
109
/**
110
* Apply filtering functions to the graph and return a sub-graph that
111
* satisfies the predicates for both vertices and edges.
112
* @param vertexFilterFun the filter function for vertices.
113
* @param edgeFilterFun the filter function for edges.
114
* @return the resulting sub-graph.
115
*/
116
def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean,
117
edgeFilterFun: Edge[K, EV] => Boolean): Graph[K, VV, EV]
118
119
/**
120
* Apply a filtering function to the graph and return a sub-graph that
121
* satisfies the predicates only for the vertices.
122
* @param vertexFilter the filter function for vertices.
123
* @return the resulting sub-graph.
124
*/
125
def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]): Graph[K, VV, EV]
126
127
/**
128
* Apply a filtering function to the graph and return a sub-graph that
129
* satisfies the predicates only for the vertices.
130
* @param vertexFilterFun the filter function for vertices.
131
* @return the resulting sub-graph.
132
*/
133
def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean): Graph[K, VV, EV]
134
135
/**
136
* Apply a filtering function to the graph and return a sub-graph that
137
* satisfies the predicates only for the edges.
138
* @param edgeFilter the filter function for edges.
139
* @return the resulting sub-graph.
140
*/
141
def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]
142
143
/**
144
* Apply a filtering function to the graph and return a sub-graph that
145
* satisfies the predicates only for the edges.
146
* @param edgeFilterFun the filter function for edges.
147
* @return the resulting sub-graph.
148
*/
149
def filterOnEdges(edgeFilterFun: Edge[K, EV] => Boolean): Graph[K, VV, EV]
150
```
151
152
### Graph Structure Modifications
153
154
Transform the structural properties of the graph.
155
156
```scala { .api }
157
/**
158
* This operation adds all inverse-direction edges to the graph.
159
* @return the undirected graph.
160
*/
161
def getUndirected(): Graph[K, VV, EV]
162
163
/**
164
* Reverse the direction of the edges in the graph
165
* @return a new graph with all edges reversed
166
*/
167
def reverse(): Graph[K, VV, EV]
168
```
169
170
**Usage Examples:**
171
172
```scala
173
import org.apache.flink.graph.scala._
174
import org.apache.flink.graph.{Edge, Vertex}
175
import org.apache.flink.api.scala._
176
177
val env = ExecutionEnvironment.getExecutionEnvironment
178
179
// Create sample graph
180
val vertices = env.fromCollection(Seq(
181
new Vertex(1L, "Alice"),
182
new Vertex(2L, "Bob"),
183
new Vertex(3L, "Charlie_longname")
184
))
185
186
val edges = env.fromCollection(Seq(
187
new Edge(1L, 2L, 0.5),
188
new Edge(2L, 3L, 0.3),
189
new Edge(1L, 3L, 0.8)
190
))
191
192
val graph = Graph.fromDataSet(vertices, edges, env)
193
194
// Vertex transformations
195
val upperCaseGraph = graph.mapVertices(v => v.getValue.toUpperCase)
196
val lengthGraph = graph.mapVertices(_.getValue.length)
197
198
// Edge transformations
199
val doubledWeights = graph.mapEdges(e => e.getValue * 2.0)
200
val binaryEdges = graph.mapEdges(e => if (e.getValue > 0.5) 1 else 0)
201
202
// Filtering operations
203
val longNameVertices = graph.filterOnVertices(_.getValue.length > 5)
204
val strongEdges = graph.filterOnEdges(_.getValue > 0.4)
205
val filteredSubgraph = graph.subgraph(
206
vertexFilterFun = _.getValue.length <= 10,
207
edgeFilterFun = _.getValue >= 0.3
208
)
209
210
// Structure modifications
211
val undirectedGraph = graph.getUndirected()
212
val reversedGraph = graph.reverse()
213
214
// ID translation (e.g., Long to String)
215
val stringIdGraph = graph.translateGraphIds[String]((longId, reuse) => longId.toString)
216
217
// Value translation
218
val intValueGraph = graph.translateVertexValues[Int]((stringValue, reuse) => stringValue.length)
219
```
220
221
### Advanced Filtering Patterns
222
223
More sophisticated filtering examples for complex graph analysis scenarios.
224
225
**Usage Examples:**
226
227
```scala
228
// Multi-criteria vertex filtering
229
val complexVertexFilter = graph.filterOnVertices { vertex =>
230
val name = vertex.getValue
231
name.length > 3 && name.startsWith("A")
232
}
233
234
// Edge filtering based on vertex relationships
235
val edgeFilterWithThreshold = graph.filterOnEdges { edge =>
236
edge.getValue > 0.5 && edge.getSource != edge.getTarget
237
}
238
239
// Combined subgraph filtering
240
val analyticsSubgraph = graph.subgraph(
241
vertexFilterFun = vertex => {
242
val name = vertex.getValue
243
name.contains("a") || name.contains("A")
244
},
245
edgeFilterFun = edge => {
246
edge.getValue >= 0.4
247
}
248
)
249
250
// Chaining transformations
251
val processedGraph = graph
252
.mapVertices(_.getValue.toLowerCase.trim)
253
.filterOnVertices(_.getValue.nonEmpty)
254
.mapEdges(e => math.round(e.getValue * 100) / 100.0)
255
.filterOnEdges(_.getValue > 0.0)
256
```
257
258
### Type Safety and Performance
259
260
The transformation operations maintain full type safety throughout the pipeline:
261
262
- **Type preservation**: Operations maintain the type relationships between K, VV, and EV
263
- **Lazy evaluation**: Transformations are lazily evaluated within Flink's execution model
264
- **Distributed processing**: All operations are automatically distributed across the Flink cluster
265
- **Functional composition**: Operations can be chained together for complex processing pipelines
266
267
All transformation methods return new Graph instances, preserving immutability principles while enabling efficient distributed processing.