Operations for joining graphs with external datasets, converting between different data representations, and integrating graph processing with the broader Flink ecosystem.
Join graph vertices and edges with external datasets to enrich graph data or update graph structure based on external information.
Join vertex data with external datasets using vertex IDs as join keys.
/**
* Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
* a user-defined transformation on the values of the matched records.
* The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
* @param inputDataSet the Tuple2 DataSet to join with.
* The first field of the Tuple2 is used as the join key and the second field is passed
* as a parameter to the transformation function.
* @param vertexJoinFunction the transformation function to apply.
* The first parameter is the current vertex value and the second parameter is the value
* of the matched Tuple2 from the input DataSet.
* @return a new Graph, where the vertex values have been updated according to the
* result of the vertexJoinFunction.
* @tparam T the type of the second field of the input Tuple2 DataSet.
*/
def joinWithVertices[T](inputDataSet: DataSet[(K, T)],
vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV]
/**
* Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
* a user-defined transformation on the values of the matched records.
* The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
* @param inputDataSet the Tuple2 DataSet to join with.
* The first field of the Tuple2 is used as the join key and the second field is passed
* as a parameter to the transformation function.
* @param fun the transformation function to apply.
* The first parameter is the current vertex value and the second parameter is the value
* of the matched Tuple2 from the input DataSet.
* @return a new Graph, where the vertex values have been updated according to the
* result of the vertexJoinFunction.
* @tparam T the type of the second field of the input Tuple2 DataSet.
*/
def joinWithVertices[T](inputDataSet: DataSet[(K, T)],
fun: (VV, T) => VV): Graph[K, VV, EV]Join edge data with external datasets using composite keys (source, target) or individual vertex IDs.
/**
* Joins the edge DataSet with an input DataSet on the composite key of both
* source and target IDs and applies a user-defined transformation on the values
* of the matched records. The first two fields of the input DataSet are used as join keys.
* @param inputDataSet the DataSet to join with.
* The first two fields of the Tuple3 are used as the composite join key
* and the third field is passed as a parameter to the transformation function.
* @param edgeJoinFunction the transformation function to apply.
* The first parameter is the current edge value and the second parameter is the value
* of the matched Tuple3 from the input DataSet.
* @tparam T the type of the third field of the input Tuple3 DataSet.
* @return a new Graph, where the edge values have been updated according to the
* result of the edgeJoinFunction.
*/
def joinWithEdges[T](inputDataSet: DataSet[(K, K, T)],
edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
/**
* Joins the edge DataSet with an input DataSet on the composite key of both
* source and target IDs and applies a user-defined transformation on the values
* of the matched records. The first two fields of the input DataSet are used as join keys.
* @param inputDataSet the DataSet to join with.
* The first two fields of the Tuple3 are used as the composite join key
* and the third field is passed as a parameter to the transformation function.
* @param fun the transformation function to apply.
* The first parameter is the current edge value and the second parameter is the value
* of the matched Tuple3 from the input DataSet.
* @tparam T the type of the third field of the input Tuple3 DataSet.
* @return a new Graph, where the edge values have been updated according to the
* result of the edgeJoinFunction.
*/
def joinWithEdges[T](inputDataSet: DataSet[(K, K, T)],
fun: (EV, T) => EV): Graph[K, VV, EV]Join edges based on either source or target vertex IDs only.
/**
* Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
* on the values of the matched records.
* The source ID of the edges input and the first field of the input DataSet
* are used as join keys.
* @param inputDataSet the DataSet to join with.
* The first field of the Tuple2 is used as the join key
* and the second field is passed as a parameter to the transformation function.
* @param edgeJoinFunction the transformation function to apply.
* The first parameter is the current edge value and the second parameter is the value
* of the matched Tuple2 from the input DataSet.
* @tparam T the type of the second field of the input Tuple2 DataSet.
* @return a new Graph, where the edge values have been updated according to the
* result of the edgeJoinFunction.
*/
def joinWithEdgesOnSource[T](inputDataSet: DataSet[(K, T)],
edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
/**
* Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
* on the values of the matched records.
* The source ID of the edges input and the first field of the input DataSet
* are used as join keys.
* @param inputDataSet the DataSet to join with.
* The first field of the Tuple2 is used as the join key
* and the second field is passed as a parameter to the transformation function.
* @param fun the transformation function to apply.
* The first parameter is the current edge value and the second parameter is the value
* of the matched Tuple2 from the input DataSet.
* @tparam T the type of the second field of the input Tuple2 DataSet.
* @return a new Graph, where the edge values have been updated according to the
* result of the edgeJoinFunction.
*/
def joinWithEdgesOnSource[T](inputDataSet: DataSet[(K, T)],
fun: (EV, T) => EV): Graph[K, VV, EV]
/**
* Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
* on the values of the matched records.
* The target ID of the edges input and the first field of the input DataSet
* are used as join keys.
* @param inputDataSet the DataSet to join with.
* The first field of the Tuple2 is used as the join key
* and the second field is passed as a parameter to the transformation function.
* @param edgeJoinFunction the transformation function to apply.
* The first parameter is the current edge value and the second parameter is the value
* of the matched Tuple2 from the input DataSet.
* @tparam T the type of the second field of the input Tuple2 DataSet.
* @return a new Graph, where the edge values have been updated according to the
* result of the edgeJoinFunction.
*/
def joinWithEdgesOnTarget[T](inputDataSet: DataSet[(K, T)],
edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
/**
* Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
* on the values of the matched records.
* The target ID of the edges input and the first field of the input DataSet
* are used as join keys.
* @param inputDataSet the DataSet to join with.
* The first field of the Tuple2 is used as the join key
* and the second field is passed as a parameter to the transformation function.
* @param fun the transformation function to apply.
* The first parameter is the current edge value and the second parameter is the value
* of the matched Tuple2 from the input DataSet.
* @tparam T the type of the second field of the input Tuple2 DataSet.
* @return a new Graph, where the edge values have been updated according to the
* result of the edgeJoinFunction.
*/
def joinWithEdgesOnTarget[T](inputDataSet: DataSet[(K, T)],
fun: (EV, T) => EV): Graph[K, VV, EV]Convert between different data representations to integrate with various Flink operators and external systems.
/**
* @return the vertex DataSet as Tuple2.
*/
def getVerticesAsTuple2(): DataSet[(K, VV)]
/**
* @return the edge DataSet as Tuple3.
*/
def getEdgesAsTuple3(): DataSet[(K, K, EV)]/**
* @return a DataSet of Triplets,
* consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
*/
def getTriplets(): DataSet[Triplet[K, VV, EV]]Pre-built mapper functions for common data transformations.
/**
* Map function to convert (K, VV) tuples to Vertex[K, VV]
*/
class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]] {
def map(value: (K, VV)): Vertex[K, VV]
}
/**
* Map function to convert (K, K, EV) tuples to Edge[K, EV]
*/
class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]] {
def map(value: (K, K, EV)): Edge[K, EV]
}
/**
* Map function to convert Vertex[K, VV] to (K, VV) tuples
*/
class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] {
def map(value: Vertex[K, VV]): (K, VV)
}
/**
* Map function to convert Edge[K, EV] to (K, K, EV) tuples
*/
class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] {
def map(value: Edge[K, EV]): (K, K, EV)
}import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.api.scala._
val env = ExecutionEnvironment.getExecutionEnvironment
// Create base graph
val vertices = env.fromCollection(Seq(
new Vertex(1L, "Alice"),
new Vertex(2L, "Bob"),
new Vertex(3L, "Charlie")
))
val edges = env.fromCollection(Seq(
new Edge(1L, 2L, 0.5),
new Edge(2L, 3L, 0.8)
))
val graph = Graph.fromDataSet(vertices, edges, env)
// External data to join with
val ageData = env.fromCollection(Seq(
(1L, 25),
(2L, 30),
(3L, 35)
))
// Join vertex names with age data
val enrichedGraph = graph.joinWithVertices(ageData, (name: String, age: Int) => s"$name($age)")
// Result: vertices now have values like "Alice(25)", "Bob(30)", etc.// External edge weight updates
val weightUpdates = env.fromCollection(Seq(
(1L, 2L, 0.3), // New weight for edge 1->2
(2L, 3L, 0.9) // New weight for edge 2->3
))
// Update edge weights by averaging with external data
val updatedGraph = graph.joinWithEdges(
weightUpdates,
(currentWeight: Double, newWeight: Double) => (currentWeight + newWeight) / 2.0
)// Update edges based on source vertex properties
val sourceData = env.fromCollection(Seq(
(1L, 0.1), // Boost factor for edges from vertex 1
(2L, 0.2) // Boost factor for edges from vertex 2
))
val boostedGraph = graph.joinWithEdgesOnSource(
sourceData,
(edgeWeight: Double, boostFactor: Double) => edgeWeight * (1.0 + boostFactor)
)// Convert graph data to tuple format for standard Flink operations
val vertexTuples = graph.getVerticesAsTuple2() // DataSet[(Long, String)]
val edgeTuples = graph.getEdgesAsTuple3() // DataSet[(Long, Long, Double)]
// Use with standard Flink transformations
val vertexNames = vertexTuples.map(_._2) // Extract just the names
val edgeWeights = edgeTuples.map(_._3) // Extract just the weights
// Get triplets for complex analysis
val triplets = graph.getTriplets()
val analyzedTriplets = triplets.map { triplet =>
val srcValue = triplet.getSrcVertex.getValue
val trgValue = triplet.getTrgVertex.getValue
val edgeValue = triplet.getEdge.getValue
(triplet.getSrcVertex.getId, srcValue, trgValue, edgeValue)
}import org.apache.flink.graph.scala.utils._
// Convert tuple DataSet to vertex DataSet
val rawVertexData = env.fromCollection(Seq((1L, "Alice"), (2L, "Bob")))
val vertexDataSet = rawVertexData.map(new Tuple2ToVertexMap[Long, String])
// Convert edge DataSet to tuple DataSet
val rawEdgeData = graph.getEdges()
val edgeTuples = rawEdgeData.map(new EdgeToTuple3Map[Long, Double])// Complete data integration example
val externalVertexData = env.fromCollection(Seq(
(1L, ("Alice", 25, "Engineer")),
(2L, ("Bob", 30, "Manager")),
(3L, ("Charlie", 35, "Analyst"))
))
val externalEdgeData = env.fromCollection(Seq(
(1L, 2L, ("collaboration", 0.8)),
(2L, 3L, ("supervision", 0.9))
))
// Transform external data to appropriate formats
val formattedVertexData = externalVertexData.map {
case (id, (name, age, role)) => (id, s"$name-$role-$age")
}
val formattedEdgeData = externalEdgeData.map {
case (src, tgt, (relation, strength)) => (src, tgt, strength)
}
// Join with graph
val integratedGraph = graph
.joinWithVertices(formattedVertexData, (current: String, external: String) => external)
.joinWithEdges(formattedEdgeData, (current: Double, external: Double) => external)
// Export results in different formats
val finalVertices = integratedGraph.getVerticesAsTuple2()
val finalEdges = integratedGraph.getEdgesAsTuple3()
val finalTriplets = integratedGraph.getTriplets()// Convert to Table API format
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api._
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Convert graph data to tables
val vertexTable = tableEnv.fromDataSet(graph.getVerticesAsTuple2(), 'id, 'value)
val edgeTable = tableEnv.fromDataSet(graph.getEdgesAsTuple3(), 'source, 'target, 'weight)
// Perform SQL queries
val highDegreeVertices = vertexTable
.join(edgeTable, 'id === 'source)
.groupBy('id, 'value)
.select('id, 'value, 'weight.sum as 'totalWeight)
.filter('totalWeight > 1.0)// From Java Gelly - Join function interfaces
trait VertexJoinFunction[VV, T] {
def vertexJoin(vertexValue: VV, inputValue: T): VV
}
trait EdgeJoinFunction[EV, T] {
def edgeJoin(edgeValue: EV, inputValue: T): EV
}Data integration capabilities enable seamless combination of graph processing with the broader Flink ecosystem and external data sources.