0
# Data Integration
1
2
Operations for joining graphs with external datasets, converting between different data representations, and integrating graph processing with the broader Flink ecosystem.
3
4
## Capabilities
5
6
### Graph-Dataset Joins
7
8
Join graph vertices and edges with external datasets to enrich graph data or update graph structure based on external information.
9
10
#### Vertex Joins
11
12
Join vertex data with external datasets using vertex IDs as join keys.
13
14
```scala { .api }
15
/**
16
* Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
17
* a user-defined transformation on the values of the matched records.
18
* The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
19
* @param inputDataSet the Tuple2 DataSet to join with.
20
* The first field of the Tuple2 is used as the join key and the second field is passed
21
* as a parameter to the transformation function.
22
* @param vertexJoinFunction the transformation function to apply.
23
* The first parameter is the current vertex value and the second parameter is the value
24
* of the matched Tuple2 from the input DataSet.
25
* @return a new Graph, where the vertex values have been updated according to the
26
* result of the vertexJoinFunction.
27
* @tparam T the type of the second field of the input Tuple2 DataSet.
28
*/
29
def joinWithVertices[T](inputDataSet: DataSet[(K, T)],
30
vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV]
31
32
/**
33
* Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
34
* a user-defined transformation on the values of the matched records.
35
* The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
36
* @param inputDataSet the Tuple2 DataSet to join with.
37
* The first field of the Tuple2 is used as the join key and the second field is passed
38
* as a parameter to the transformation function.
39
* @param fun the transformation function to apply.
40
* The first parameter is the current vertex value and the second parameter is the value
41
* of the matched Tuple2 from the input DataSet.
42
* @return a new Graph, where the vertex values have been updated according to the
43
* result of the vertexJoinFunction.
44
* @tparam T the type of the second field of the input Tuple2 DataSet.
45
*/
46
def joinWithVertices[T](inputDataSet: DataSet[(K, T)],
47
fun: (VV, T) => VV): Graph[K, VV, EV]
48
```
49
50
#### Edge Joins
51
52
Join edge data with external datasets using composite keys (source, target) or individual vertex IDs.
53
54
```scala { .api }
55
/**
56
* Joins the edge DataSet with an input DataSet on the composite key of both
57
* source and target IDs and applies a user-defined transformation on the values
58
* of the matched records. The first two fields of the input DataSet are used as join keys.
59
* @param inputDataSet the DataSet to join with.
60
* The first two fields of the Tuple3 are used as the composite join key
61
* and the third field is passed as a parameter to the transformation function.
62
* @param edgeJoinFunction the transformation function to apply.
63
* The first parameter is the current edge value and the second parameter is the value
64
* of the matched Tuple3 from the input DataSet.
65
* @tparam T the type of the third field of the input Tuple3 DataSet.
66
* @return a new Graph, where the edge values have been updated according to the
67
* result of the edgeJoinFunction.
68
*/
69
def joinWithEdges[T](inputDataSet: DataSet[(K, K, T)],
70
edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
71
72
/**
73
* Joins the edge DataSet with an input DataSet on the composite key of both
74
* source and target IDs and applies a user-defined transformation on the values
75
* of the matched records. The first two fields of the input DataSet are used as join keys.
76
* @param inputDataSet the DataSet to join with.
77
* The first two fields of the Tuple3 are used as the composite join key
78
* and the third field is passed as a parameter to the transformation function.
79
* @param fun the transformation function to apply.
80
* The first parameter is the current edge value and the second parameter is the value
81
* of the matched Tuple3 from the input DataSet.
82
* @tparam T the type of the third field of the input Tuple3 DataSet.
83
* @return a new Graph, where the edge values have been updated according to the
84
* result of the edgeJoinFunction.
85
*/
86
def joinWithEdges[T](inputDataSet: DataSet[(K, K, T)],
87
fun: (EV, T) => EV): Graph[K, VV, EV]
88
```
89
90
#### Source and Target Specific Edge Joins
91
92
Join edges based on either source or target vertex IDs only.
93
94
```scala { .api }
95
/**
96
* Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
97
* on the values of the matched records.
98
* The source ID of the edges input and the first field of the input DataSet
99
* are used as join keys.
100
* @param inputDataSet the DataSet to join with.
101
* The first field of the Tuple2 is used as the join key
102
* and the second field is passed as a parameter to the transformation function.
103
* @param edgeJoinFunction the transformation function to apply.
104
* The first parameter is the current edge value and the second parameter is the value
105
* of the matched Tuple2 from the input DataSet.
106
* @tparam T the type of the second field of the input Tuple2 DataSet.
107
* @return a new Graph, where the edge values have been updated according to the
108
* result of the edgeJoinFunction.
109
*/
110
def joinWithEdgesOnSource[T](inputDataSet: DataSet[(K, T)],
111
edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
112
113
/**
114
* Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
115
* on the values of the matched records.
116
* The source ID of the edges input and the first field of the input DataSet
117
* are used as join keys.
118
* @param inputDataSet the DataSet to join with.
119
* The first field of the Tuple2 is used as the join key
120
* and the second field is passed as a parameter to the transformation function.
121
* @param fun the transformation function to apply.
122
* The first parameter is the current edge value and the second parameter is the value
123
* of the matched Tuple2 from the input DataSet.
124
* @tparam T the type of the second field of the input Tuple2 DataSet.
125
* @return a new Graph, where the edge values have been updated according to the
126
* result of the edgeJoinFunction.
127
*/
128
def joinWithEdgesOnSource[T](inputDataSet: DataSet[(K, T)],
129
fun: (EV, T) => EV): Graph[K, VV, EV]
130
131
/**
132
* Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
133
* on the values of the matched records.
134
* The target ID of the edges input and the first field of the input DataSet
135
* are used as join keys.
136
* @param inputDataSet the DataSet to join with.
137
* The first field of the Tuple2 is used as the join key
138
* and the second field is passed as a parameter to the transformation function.
139
* @param edgeJoinFunction the transformation function to apply.
140
* The first parameter is the current edge value and the second parameter is the value
141
* of the matched Tuple2 from the input DataSet.
142
* @tparam T the type of the second field of the input Tuple2 DataSet.
143
* @return a new Graph, where the edge values have been updated according to the
144
* result of the edgeJoinFunction.
145
*/
146
def joinWithEdgesOnTarget[T](inputDataSet: DataSet[(K, T)],
147
edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
148
149
/**
150
* Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
151
* on the values of the matched records.
152
* The target ID of the edges input and the first field of the input DataSet
153
* are used as join keys.
154
* @param inputDataSet the DataSet to join with.
155
* The first field of the Tuple2 is used as the join key
156
* and the second field is passed as a parameter to the transformation function.
157
* @param fun the transformation function to apply.
158
* The first parameter is the current edge value and the second parameter is the value
159
* of the matched Tuple2 from the input DataSet.
160
* @tparam T the type of the second field of the input Tuple2 DataSet.
161
* @return a new Graph, where the edge values have been updated according to the
162
* result of the edgeJoinFunction.
163
*/
164
def joinWithEdgesOnTarget[T](inputDataSet: DataSet[(K, T)],
165
fun: (EV, T) => EV): Graph[K, VV, EV]
166
```
167
168
### Data Format Conversion
169
170
Convert between different data representations to integrate with various Flink operators and external systems.
171
172
#### Tuple Conversion
173
174
```scala { .api }
175
/**
176
* @return the vertex DataSet as Tuple2.
177
*/
178
def getVerticesAsTuple2(): DataSet[(K, VV)]
179
180
/**
181
* @return the edge DataSet as Tuple3.
182
*/
183
def getEdgesAsTuple3(): DataSet[(K, K, EV)]
184
```
185
186
#### Triplet Access
187
188
```scala { .api }
189
/**
190
* @return a DataSet of Triplets,
191
* consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
192
*/
193
def getTriplets(): DataSet[Triplet[K, VV, EV]]
194
```
195
196
### Utility Mapper Classes
197
198
Pre-built mapper functions for common data transformations.
199
200
```scala { .api }
201
/**
202
* Map function to convert (K, VV) tuples to Vertex[K, VV]
203
*/
204
class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] {
205
def map(value: (K, VV)): Vertex[K, VV]
206
}
207
208
/**
209
* Map function to convert (K, K, EV) tuples to Edge[K, EV]
210
*/
211
class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] {
212
def map(value: (K, K, EV)): Edge[K, EV]
213
}
214
215
/**
216
* Map function to convert Vertex[K, VV] to (K, VV) tuples
217
*/
218
class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] {
219
def map(value: Vertex[K, VV]): (K, VV)
220
}
221
222
/**
223
* Map function to convert Edge[K, EV] to (K, K, EV) tuples
224
*/
225
class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] {
226
def map(value: Edge[K, EV]): (K, K, EV)
227
}
228
```
229
230
## Usage Examples
231
232
### Vertex Data Enrichment
233
234
```scala
235
import org.apache.flink.graph.scala._
236
import org.apache.flink.graph.{Edge, Vertex}
237
import org.apache.flink.api.scala._
238
239
val env = ExecutionEnvironment.getExecutionEnvironment
240
241
// Create base graph
242
val vertices = env.fromCollection(Seq(
243
new Vertex(1L, "Alice"),
244
new Vertex(2L, "Bob"),
245
new Vertex(3L, "Charlie")
246
))
247
248
val edges = env.fromCollection(Seq(
249
new Edge(1L, 2L, 0.5),
250
new Edge(2L, 3L, 0.8)
251
))
252
253
val graph = Graph.fromDataSet(vertices, edges, env)
254
255
// External data to join with
256
val ageData = env.fromCollection(Seq(
257
(1L, 25),
258
(2L, 30),
259
(3L, 35)
260
))
261
262
// Join vertex names with age data
263
val enrichedGraph = graph.joinWithVertices(ageData, (name: String, age: Int) => s"$name($age)")
264
265
// Result: vertices now have values like "Alice(25)", "Bob(30)", etc.
266
```
267
268
### Edge Weight Updates
269
270
```scala
271
// External edge weight updates
272
val weightUpdates = env.fromCollection(Seq(
273
(1L, 2L, 0.3), // New weight for edge 1->2
274
(2L, 3L, 0.9) // New weight for edge 2->3
275
))
276
277
// Update edge weights by averaging with external data
278
val updatedGraph = graph.joinWithEdges(
279
weightUpdates,
280
(currentWeight: Double, newWeight: Double) => (currentWeight + newWeight) / 2.0
281
)
282
```
283
284
### Source-Specific Updates
285
286
```scala
287
// Update edges based on source vertex properties
288
val sourceData = env.fromCollection(Seq(
289
(1L, 0.1), // Boost factor for edges from vertex 1
290
(2L, 0.2) // Boost factor for edges from vertex 2
291
))
292
293
val boostedGraph = graph.joinWithEdgesOnSource(
294
sourceData,
295
(edgeWeight: Double, boostFactor: Double) => edgeWeight * (1.0 + boostFactor)
296
)
297
```
298
299
### Data Format Conversion
300
301
```scala
302
// Convert graph data to tuple format for standard Flink operations
303
val vertexTuples = graph.getVerticesAsTuple2() // DataSet[(Long, String)]
304
val edgeTuples = graph.getEdgesAsTuple3() // DataSet[(Long, Long, Double)]
305
306
// Use with standard Flink transformations
307
val vertexNames = vertexTuples.map(_._2) // Extract just the names
308
val edgeWeights = edgeTuples.map(_._3) // Extract just the weights
309
310
// Get triplets for complex analysis
311
val triplets = graph.getTriplets()
312
val analyzedTriplets = triplets.map { triplet =>
313
val srcValue = triplet.getSrcVertex.getValue
314
val trgValue = triplet.getTrgVertex.getValue
315
val edgeValue = triplet.getEdge.getValue
316
317
(triplet.getSrcVertex.getId, srcValue, trgValue, edgeValue)
318
}
319
```
320
321
### Using Utility Mappers
322
323
```scala
324
import org.apache.flink.graph.scala.utils._
325
326
// Convert tuple DataSet to vertex DataSet
327
val rawVertexData = env.fromCollection(Seq((1L, "Alice"), (2L, "Bob")))
328
val vertexDataSet = rawVertexData.map(new Tuple2ToVertexMap[Long, String])
329
330
// Convert edge DataSet to tuple DataSet
331
val rawEdgeData = graph.getEdges()
332
val edgeTuples = rawEdgeData.map(new EdgeToTuple3Map[Long, Double])
333
```
334
335
### Complex Data Integration Pipeline
336
337
```scala
338
// Complete data integration example
339
val externalVertexData = env.fromCollection(Seq(
340
(1L, ("Alice", 25, "Engineer")),
341
(2L, ("Bob", 30, "Manager")),
342
(3L, ("Charlie", 35, "Analyst"))
343
))
344
345
val externalEdgeData = env.fromCollection(Seq(
346
(1L, 2L, ("collaboration", 0.8)),
347
(2L, 3L, ("supervision", 0.9))
348
))
349
350
// Transform external data to appropriate formats
351
val formattedVertexData = externalVertexData.map {
352
case (id, (name, age, role)) => (id, s"$name-$role-$age")
353
}
354
355
val formattedEdgeData = externalEdgeData.map {
356
case (src, tgt, (relation, strength)) => (src, tgt, strength)
357
}
358
359
// Join with graph
360
val integratedGraph = graph
361
.joinWithVertices(formattedVertexData, (current: String, external: String) => external)
362
.joinWithEdges(formattedEdgeData, (current: Double, external: Double) => external)
363
364
// Export results in different formats
365
val finalVertices = integratedGraph.getVerticesAsTuple2()
366
val finalEdges = integratedGraph.getEdgesAsTuple3()
367
val finalTriplets = integratedGraph.getTriplets()
368
```
369
370
### Integration with Flink Table API
371
372
```scala
373
// Convert to Table API format
374
import org.apache.flink.table.api.scala._
375
import org.apache.flink.table.api._
376
377
val tableEnv = TableEnvironment.getTableEnvironment(env)
378
379
// Convert graph data to tables
380
val vertexTable = tableEnv.fromDataSet(graph.getVerticesAsTuple2(), 'id, 'value)
381
val edgeTable = tableEnv.fromDataSet(graph.getEdgesAsTuple3(), 'source, 'target, 'weight)
382
383
// Perform SQL queries
384
val highDegreeVertices = vertexTable
385
.join(edgeTable, 'id === 'source)
386
.groupBy('id, 'value)
387
.select('id, 'value, 'weight.sum as 'totalWeight)
388
.filter('totalWeight > 1.0)
389
```
390
391
## Join Function Interfaces
392
393
```scala { .api }
394
// From Java Gelly - Join function interfaces
395
trait VertexJoinFunction[VV, T] {
396
def vertexJoin(vertexValue: VV, inputValue: T): VV
397
}
398
399
trait EdgeJoinFunction[EV, T] {
400
def edgeJoin(edgeValue: EV, inputValue: T): EV
401
}
402
```
403
404
## Performance Considerations
405
406
- **Join Strategy**: Flink automatically optimizes join operations based on data size
407
- **Data Locality**: Consider partitioning strategies for large datasets
408
- **Memory Usage**: Monitor memory consumption when joining large external datasets
409
- **Serialization**: Use efficient data types for join keys and values
410
- **Caching**: Cache frequently accessed external datasets
411
412
Data integration capabilities enable seamless combination of graph processing with the broader Flink ecosystem and external data sources.