Complete API for creating graphs from various data sources.
The Graph companion object provides multiple factory methods for creating graphs from different data sources:
def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
vertices: DataSet[Vertex[K, VV]],
edges: DataSet[Edge[K, EV]],
env: ExecutionEnvironment
): Graph[K, VV, EV]Creates a graph from separate vertex and edge DataSets.
Parameters:
vertices - DataSet containing graph vertices with IDs and valuesedges - DataSet containing graph edges with source, target, and valuesenv - Flink execution environmentdef fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
edges: DataSet[Edge[K, EV]],
env: ExecutionEnvironment
): Graph[K, NullValue, EV]Creates a graph from edges only. Vertices are automatically created with NullValue as vertex values.
Parameters:
edges - DataSet containing graph edgesenv - Flink execution environmentdef fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
edges: DataSet[Edge[K, EV]],
vertexValueInitializer: MapFunction[K, VV],
env: ExecutionEnvironment
): Graph[K, VV, EV]Creates a graph from edges and initializes vertex values using a mapping function.
Parameters:
edges - DataSet containing graph edgesvertexValueInitializer - Function to initialize vertex values from vertex IDsenv - Flink execution environmentdef fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
vertices: Seq[Vertex[K, VV]],
edges: Seq[Edge[K, EV]],
env: ExecutionEnvironment
): Graph[K, VV, EV]Creates a graph from Scala collections of vertices and edges.
def fromCollection[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
edges: Seq[Edge[K, EV]],
env: ExecutionEnvironment
): Graph[K, NullValue, EV]Creates a graph from a collection of edges only.
def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
edges: Seq[Edge[K, EV]],
vertexValueInitializer: MapFunction[K, VV],
env: ExecutionEnvironment
): Graph[K, VV, EV]Creates a graph from edges with vertex value initialization.
def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
vertices: DataSet[(K, VV)],
edges: DataSet[(K, K, EV)],
env: ExecutionEnvironment
): Graph[K, VV, EV]Creates a graph from tuple DataSets where:
(vertexId, vertexValue)(sourceId, targetId, edgeValue)def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
edges: DataSet[(K, K, EV)],
env: ExecutionEnvironment
): Graph[K, NullValue, EV]Creates a graph from edge tuples only.
def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
edges: DataSet[(K, K, EV)],
vertexValueInitializer: MapFunction[K, VV],
env: ExecutionEnvironment
): Graph[K, VV, EV]Creates a graph from edge tuples with vertex value initialization.
def fromTuple2DataSet[K: TypeInformation : ClassTag](
edges: DataSet[(K, K)],
env: ExecutionEnvironment
): Graph[K, NullValue, NullValue]Creates a graph from simple edge pairs with no values.
def fromTuple2DataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag](
edges: DataSet[(K, K)],
vertexValueInitializer: MapFunction[K, VV],
env: ExecutionEnvironment
): Graph[K, VV, NullValue]Creates a graph from edge pairs with vertex value initialization.
def fromCsvReader[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
env: ExecutionEnvironment,
pathEdges: String,
pathVertices: String = null,
lineDelimiterVertices: String = "\n",
fieldDelimiterVertices: String = ",",
quoteCharacterVertices: Character = null,
ignoreFirstLineVertices: Boolean = false,
ignoreCommentsVertices: String = null,
lenientVertices: Boolean = false,
includedFieldsVertices: Array[Int] = null,
lineDelimiterEdges: String = "\n",
fieldDelimiterEdges: String = ",",
quoteCharacterEdges: Character = null,
ignoreFirstLineEdges: Boolean = false,
ignoreCommentsEdges: String = null,
lenientEdges: Boolean = false,
includedFieldsEdges: Array[Int] = null,
vertexValueInitializer: MapFunction[K, VV] = null
): Graph[K, VV, EV]Creates a graph from CSV files with extensive configuration options.
Parameters:
env - Flink execution environmentpathEdges - File path containing the edges (required)pathVertices - File path containing the vertices (optional)lineDelimiterVertices - Line separator for vertices file (default: "\n")fieldDelimiterVertices - Field separator for vertices file (default: ",")quoteCharacterVertices - Quote character for vertices file parsingignoreFirstLineVertices - Whether to skip first line in vertices fileignoreCommentsVertices - String prefix for comment lines to ignore in vertices filelenientVertices - Whether to silently ignore malformed lines in vertices fileincludedFieldsVertices - Array of field indices to read from vertices filelineDelimiterEdges - Line separator for edges file (default: "\n")fieldDelimiterEdges - Field separator for edges file (default: ",")quoteCharacterEdges - Quote character for edges file parsingignoreFirstLineEdges - Whether to skip first line in edges fileignoreCommentsEdges - String prefix for comment lines to ignore in edges filelenientEdges - Whether to silently ignore malformed lines in edges fileincludedFieldsEdges - Array of field indices to read from edges filevertexValueInitializer - Function to initialize vertex values if no vertices file providedimport org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}
val env = ExecutionEnvironment.getExecutionEnvironment
// From collections
val vertices = List(
new Vertex(1L, "Node A"),
new Vertex(2L, "Node B"),
new Vertex(3L, "Node C")
)
val edges = List(
new Edge(1L, 2L, 1.0),
new Edge(2L, 3L, 2.0),
new Edge(1L, 3L, 3.0)
)
val graph = Graph.fromCollection(vertices, edges, env)val vertexTuples = env.fromCollection(List(
(1L, "A"),
(2L, "B"),
(3L, "C")
))
val edgeTuples = env.fromCollection(List(
(1L, 2L, 1.0),
(2L, 3L, 2.0)
))
val graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env)val graph = Graph.fromCsvReader[Long, String, Double](
env = env,
pathEdges = "path/to/edges.csv",
pathVertices = "path/to/vertices.csv",
fieldDelimiterEdges = "\t",
ignoreFirstLineEdges = true
)