or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-integration.mdgraph-algorithms.mdgraph-analytics.mdgraph-creation.mdgraph-transformations.mdindex.mduser-defined-functions.md

data-integration.mddocs/

0

# Data Integration

1

2

Operations for joining graphs with external datasets, converting between different data representations, and integrating graph processing with the broader Flink ecosystem.

3

4

## Capabilities

5

6

### Graph-Dataset Joins

7

8

Join graph vertices and edges with external datasets to enrich graph data or update graph structure based on external information.

9

10

#### Vertex Joins

11

12

Join vertex data with external datasets using vertex IDs as join keys.

13

14

```scala { .api }

15

/**

16

* Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies

17

* a user-defined transformation on the values of the matched records.

18

* The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.

19

* @param inputDataSet the Tuple2 DataSet to join with.

20

* The first field of the Tuple2 is used as the join key and the second field is passed

21

* as a parameter to the transformation function.

22

* @param vertexJoinFunction the transformation function to apply.

23

* The first parameter is the current vertex value and the second parameter is the value

24

* of the matched Tuple2 from the input DataSet.

25

* @return a new Graph, where the vertex values have been updated according to the

26

* result of the vertexJoinFunction.

27

* @tparam T the type of the second field of the input Tuple2 DataSet.

28

*/

29

def joinWithVertices[T](inputDataSet: DataSet[(K, T)],

30

vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV]

31

32

/**

33

* Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies

34

* a user-defined transformation on the values of the matched records.

35

* The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.

36

* @param inputDataSet the Tuple2 DataSet to join with.

37

* The first field of the Tuple2 is used as the join key and the second field is passed

38

* as a parameter to the transformation function.

39

* @param fun the transformation function to apply.

40

* The first parameter is the current vertex value and the second parameter is the value

41

* of the matched Tuple2 from the input DataSet.

42

* @return a new Graph, where the vertex values have been updated according to the

43

* result of the vertexJoinFunction.

44

* @tparam T the type of the second field of the input Tuple2 DataSet.

45

*/

46

def joinWithVertices[T](inputDataSet: DataSet[(K, T)],

47

fun: (VV, T) => VV): Graph[K, VV, EV]

48

```

49

50

#### Edge Joins

51

52

Join edge data with external datasets using composite keys (source, target) or individual vertex IDs.

53

54

```scala { .api }

55

/**

56

* Joins the edge DataSet with an input DataSet on the composite key of both

57

* source and target IDs and applies a user-defined transformation on the values

58

* of the matched records. The first two fields of the input DataSet are used as join keys.

59

* @param inputDataSet the DataSet to join with.

60

* The first two fields of the Tuple3 are used as the composite join key

61

* and the third field is passed as a parameter to the transformation function.

62

* @param edgeJoinFunction the transformation function to apply.

63

* The first parameter is the current edge value and the second parameter is the value

64

* of the matched Tuple3 from the input DataSet.

65

* @tparam T the type of the third field of the input Tuple3 DataSet.

66

* @return a new Graph, where the edge values have been updated according to the

67

* result of the edgeJoinFunction.

68

*/

69

def joinWithEdges[T](inputDataSet: DataSet[(K, K, T)],

70

edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]

71

72

/**

73

* Joins the edge DataSet with an input DataSet on the composite key of both

74

* source and target IDs and applies a user-defined transformation on the values

75

* of the matched records. The first two fields of the input DataSet are used as join keys.

76

* @param inputDataSet the DataSet to join with.

77

* The first two fields of the Tuple3 are used as the composite join key

78

* and the third field is passed as a parameter to the transformation function.

79

* @param fun the transformation function to apply.

80

* The first parameter is the current edge value and the second parameter is the value

81

* of the matched Tuple3 from the input DataSet.

82

* @tparam T the type of the third field of the input Tuple3 DataSet.

83

* @return a new Graph, where the edge values have been updated according to the

84

* result of the edgeJoinFunction.

85

*/

86

def joinWithEdges[T](inputDataSet: DataSet[(K, K, T)],

87

fun: (EV, T) => EV): Graph[K, VV, EV]

88

```

89

90

#### Source and Target Specific Edge Joins

91

92

Join edges based on either source or target vertex IDs only.

93

94

```scala { .api }

95

/**

96

* Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation

97

* on the values of the matched records.

98

* The source ID of the edges input and the first field of the input DataSet

99

* are used as join keys.

100

* @param inputDataSet the DataSet to join with.

101

* The first field of the Tuple2 is used as the join key

102

* and the second field is passed as a parameter to the transformation function.

103

* @param edgeJoinFunction the transformation function to apply.

104

* The first parameter is the current edge value and the second parameter is the value

105

* of the matched Tuple2 from the input DataSet.

106

* @tparam T the type of the second field of the input Tuple2 DataSet.

107

* @return a new Graph, where the edge values have been updated according to the

108

* result of the edgeJoinFunction.

109

*/

110

def joinWithEdgesOnSource[T](inputDataSet: DataSet[(K, T)],

111

edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]

112

113

/**

114

* Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation

115

* on the values of the matched records.

116

* The source ID of the edges input and the first field of the input DataSet

117

* are used as join keys.

118

* @param inputDataSet the DataSet to join with.

119

* The first field of the Tuple2 is used as the join key

120

* and the second field is passed as a parameter to the transformation function.

121

* @param fun the transformation function to apply.

122

* The first parameter is the current edge value and the second parameter is the value

123

* of the matched Tuple2 from the input DataSet.

124

* @tparam T the type of the second field of the input Tuple2 DataSet.

125

* @return a new Graph, where the edge values have been updated according to the

126

* result of the edgeJoinFunction.

127

*/

128

def joinWithEdgesOnSource[T](inputDataSet: DataSet[(K, T)],

129

fun: (EV, T) => EV): Graph[K, VV, EV]

130

131

/**

132

* Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation

133

* on the values of the matched records.

134

* The target ID of the edges input and the first field of the input DataSet

135

* are used as join keys.

136

* @param inputDataSet the DataSet to join with.

137

* The first field of the Tuple2 is used as the join key

138

* and the second field is passed as a parameter to the transformation function.

139

* @param edgeJoinFunction the transformation function to apply.

140

* The first parameter is the current edge value and the second parameter is the value

141

* of the matched Tuple2 from the input DataSet.

142

* @tparam T the type of the second field of the input Tuple2 DataSet.

143

* @return a new Graph, where the edge values have been updated according to the

144

* result of the edgeJoinFunction.

145

*/

146

def joinWithEdgesOnTarget[T](inputDataSet: DataSet[(K, T)],

147

edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]

148

149

/**

150

* Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation

151

* on the values of the matched records.

152

* The target ID of the edges input and the first field of the input DataSet

153

* are used as join keys.

154

* @param inputDataSet the DataSet to join with.

155

* The first field of the Tuple2 is used as the join key

156

* and the second field is passed as a parameter to the transformation function.

157

* @param fun the transformation function to apply.

158

* The first parameter is the current edge value and the second parameter is the value

159

* of the matched Tuple2 from the input DataSet.

160

* @tparam T the type of the second field of the input Tuple2 DataSet.

161

* @return a new Graph, where the edge values have been updated according to the

162

* result of the edgeJoinFunction.

163

*/

164

def joinWithEdgesOnTarget[T](inputDataSet: DataSet[(K, T)],

165

fun: (EV, T) => EV): Graph[K, VV, EV]

166

```

167

168

### Data Format Conversion

169

170

Convert between different data representations to integrate with various Flink operators and external systems.

171

172

#### Tuple Conversion

173

174

```scala { .api }

175

/**

176

* @return the vertex DataSet as Tuple2.

177

*/

178

def getVerticesAsTuple2(): DataSet[(K, VV)]

179

180

/**

181

* @return the edge DataSet as Tuple3.

182

*/

183

def getEdgesAsTuple3(): DataSet[(K, K, EV)]

184

```

185

186

#### Triplet Access

187

188

```scala { .api }

189

/**

190

* @return a DataSet of Triplets,

191

* consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)

192

*/

193

def getTriplets(): DataSet[Triplet[K, VV, EV]]

194

```

195

196

### Utility Mapper Classes

197

198

Pre-built mapper functions for common data transformations.

199

200

```scala { .api }

201

/**

202

* Map function to convert (K, VV) tuples to Vertex[K, VV]

203

*/

204

class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] {

205

def map(value: (K, VV)): Vertex[K, VV]

206

}

207

208

/**

209

* Map function to convert (K, K, EV) tuples to Edge[K, EV]

210

*/

211

class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] {

212

def map(value: (K, K, EV)): Edge[K, EV]

213

}

214

215

/**

216

* Map function to convert Vertex[K, VV] to (K, VV) tuples

217

*/

218

class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] {

219

def map(value: Vertex[K, VV]): (K, VV)

220

}

221

222

/**

223

* Map function to convert Edge[K, EV] to (K, K, EV) tuples

224

*/

225

class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] {

226

def map(value: Edge[K, EV]): (K, K, EV)

227

}

228

```

229

230

## Usage Examples

231

232

### Vertex Data Enrichment

233

234

```scala

235

import org.apache.flink.graph.scala._

236

import org.apache.flink.graph.{Edge, Vertex}

237

import org.apache.flink.api.scala._

238

239

val env = ExecutionEnvironment.getExecutionEnvironment

240

241

// Create base graph

242

val vertices = env.fromCollection(Seq(

243

new Vertex(1L, "Alice"),

244

new Vertex(2L, "Bob"),

245

new Vertex(3L, "Charlie")

246

))

247

248

val edges = env.fromCollection(Seq(

249

new Edge(1L, 2L, 0.5),

250

new Edge(2L, 3L, 0.8)

251

))

252

253

val graph = Graph.fromDataSet(vertices, edges, env)

254

255

// External data to join with

256

val ageData = env.fromCollection(Seq(

257

(1L, 25),

258

(2L, 30),

259

(3L, 35)

260

))

261

262

// Join vertex names with age data

263

val enrichedGraph = graph.joinWithVertices(ageData, (name: String, age: Int) => s"$name($age)")

264

265

// Result: vertices now have values like "Alice(25)", "Bob(30)", etc.

266

```

267

268

### Edge Weight Updates

269

270

```scala

271

// External edge weight updates

272

val weightUpdates = env.fromCollection(Seq(

273

(1L, 2L, 0.3), // New weight for edge 1->2

274

(2L, 3L, 0.9) // New weight for edge 2->3

275

))

276

277

// Update edge weights by averaging with external data

278

val updatedGraph = graph.joinWithEdges(

279

weightUpdates,

280

(currentWeight: Double, newWeight: Double) => (currentWeight + newWeight) / 2.0

281

)

282

```

283

284

### Source-Specific Updates

285

286

```scala

287

// Update edges based on source vertex properties

288

val sourceData = env.fromCollection(Seq(

289

(1L, 0.1), // Boost factor for edges from vertex 1

290

(2L, 0.2) // Boost factor for edges from vertex 2

291

))

292

293

val boostedGraph = graph.joinWithEdgesOnSource(

294

sourceData,

295

(edgeWeight: Double, boostFactor: Double) => edgeWeight * (1.0 + boostFactor)

296

)

297

```

298

299

### Data Format Conversion

300

301

```scala

302

// Convert graph data to tuple format for standard Flink operations

303

val vertexTuples = graph.getVerticesAsTuple2() // DataSet[(Long, String)]

304

val edgeTuples = graph.getEdgesAsTuple3() // DataSet[(Long, Long, Double)]

305

306

// Use with standard Flink transformations

307

val vertexNames = vertexTuples.map(_._2) // Extract just the names

308

val edgeWeights = edgeTuples.map(_._3) // Extract just the weights

309

310

// Get triplets for complex analysis

311

val triplets = graph.getTriplets()

312

val analyzedTriplets = triplets.map { triplet =>

313

val srcValue = triplet.getSrcVertex.getValue

314

val trgValue = triplet.getTrgVertex.getValue

315

val edgeValue = triplet.getEdge.getValue

316

317

(triplet.getSrcVertex.getId, srcValue, trgValue, edgeValue)

318

}

319

```

320

321

### Using Utility Mappers

322

323

```scala

324

import org.apache.flink.graph.scala.utils._

325

326

// Convert tuple DataSet to vertex DataSet

327

val rawVertexData = env.fromCollection(Seq((1L, "Alice"), (2L, "Bob")))

328

val vertexDataSet = rawVertexData.map(new Tuple2ToVertexMap[Long, String])

329

330

// Convert edge DataSet to tuple DataSet

331

val rawEdgeData = graph.getEdges()

332

val edgeTuples = rawEdgeData.map(new EdgeToTuple3Map[Long, Double])

333

```

334

335

### Complex Data Integration Pipeline

336

337

```scala

338

// Complete data integration example

339

val externalVertexData = env.fromCollection(Seq(

340

(1L, ("Alice", 25, "Engineer")),

341

(2L, ("Bob", 30, "Manager")),

342

(3L, ("Charlie", 35, "Analyst"))

343

))

344

345

val externalEdgeData = env.fromCollection(Seq(

346

(1L, 2L, ("collaboration", 0.8)),

347

(2L, 3L, ("supervision", 0.9))

348

))

349

350

// Transform external data to appropriate formats

351

val formattedVertexData = externalVertexData.map {

352

case (id, (name, age, role)) => (id, s"$name-$role-$age")

353

}

354

355

val formattedEdgeData = externalEdgeData.map {

356

case (src, tgt, (relation, strength)) => (src, tgt, strength)

357

}

358

359

// Join with graph

360

val integratedGraph = graph

361

.joinWithVertices(formattedVertexData, (current: String, external: String) => external)

362

.joinWithEdges(formattedEdgeData, (current: Double, external: Double) => external)

363

364

// Export results in different formats

365

val finalVertices = integratedGraph.getVerticesAsTuple2()

366

val finalEdges = integratedGraph.getEdgesAsTuple3()

367

val finalTriplets = integratedGraph.getTriplets()

368

```

369

370

### Integration with Flink Table API

371

372

```scala

373

// Convert to Table API format

374

import org.apache.flink.table.api.scala._

375

import org.apache.flink.table.api._

376

377

val tableEnv = TableEnvironment.getTableEnvironment(env)

378

379

// Convert graph data to tables

380

val vertexTable = tableEnv.fromDataSet(graph.getVerticesAsTuple2(), 'id, 'value)

381

val edgeTable = tableEnv.fromDataSet(graph.getEdgesAsTuple3(), 'source, 'target, 'weight)

382

383

// Perform SQL queries

384

val highDegreeVertices = vertexTable

385

.join(edgeTable, 'id === 'source)

386

.groupBy('id, 'value)

387

.select('id, 'value, 'weight.sum as 'totalWeight)

388

.filter('totalWeight > 1.0)

389

```

390

391

## Join Function Interfaces

392

393

```scala { .api }

394

// From Java Gelly - Join function interfaces

395

trait VertexJoinFunction[VV, T] {

396

def vertexJoin(vertexValue: VV, inputValue: T): VV

397

}

398

399

trait EdgeJoinFunction[EV, T] {

400

def edgeJoin(edgeValue: EV, inputValue: T): EV

401

}

402

```

403

404

## Performance Considerations

405

406

- **Join Strategy**: Flink automatically optimizes join operations based on data size

407

- **Data Locality**: Consider partitioning strategies for large datasets

408

- **Memory Usage**: Monitor memory consumption when joining large external datasets

409

- **Serialization**: Use efficient data types for join keys and values

410

- **Caching**: Cache frequently accessed external datasets

411

412

Data integration capabilities enable seamless combination of graph processing with the broader Flink ecosystem and external data sources.