Scala API for Apache Flink's Gelly graph processing library providing distributed graph operations and algorithms
Complete API for creating graphs from various data sources.
The Graph companion object provides multiple factory methods for creating graphs from different data sources:
def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
vertices: DataSet[Vertex[K, VV]],
edges: DataSet[Edge[K, EV]],
env: ExecutionEnvironment
): Graph[K, VV, EV]Creates a graph from separate vertex and edge DataSets.
Parameters:
vertices - DataSet containing graph vertices with IDs and valuesedges - DataSet containing graph edges with source, target, and valuesenv - Flink execution environmentdef fromDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
edges: DataSet[Edge[K, EV]],
env: ExecutionEnvironment
): Graph[K, NullValue, EV]Creates a graph from edges only. Vertices are automatically created with NullValue as vertex values.
Parameters:
edges - DataSet containing graph edgesenv - Flink execution environmentdef fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
edges: DataSet[Edge[K, EV]],
vertexValueInitializer: MapFunction[K, VV],
env: ExecutionEnvironment
): Graph[K, VV, EV]Creates a graph from edges and initializes vertex values using a mapping function.
Parameters:
edges - DataSet containing graph edgesvertexValueInitializer - Function to initialize vertex values from vertex IDsenv - Flink execution environmentdef fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
vertices: Seq[Vertex[K, VV]],
edges: Seq[Edge[K, EV]],
env: ExecutionEnvironment
): Graph[K, VV, EV]Creates a graph from Scala collections of vertices and edges.
def fromCollection[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
edges: Seq[Edge[K, EV]],
env: ExecutionEnvironment
): Graph[K, NullValue, EV]Creates a graph from a collection of edges only.
def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
edges: Seq[Edge[K, EV]],
vertexValueInitializer: MapFunction[K, VV],
env: ExecutionEnvironment
): Graph[K, VV, EV]Creates a graph from edges with vertex value initialization.
def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
vertices: DataSet[(K, VV)],
edges: DataSet[(K, K, EV)],
env: ExecutionEnvironment
): Graph[K, VV, EV]Creates a graph from tuple DataSets where:
(vertexId, vertexValue)(sourceId, targetId, edgeValue)def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
edges: DataSet[(K, K, EV)],
env: ExecutionEnvironment
): Graph[K, NullValue, EV]Creates a graph from edge tuples only.
def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
edges: DataSet[(K, K, EV)],
vertexValueInitializer: MapFunction[K, VV],
env: ExecutionEnvironment
): Graph[K, VV, EV]Creates a graph from edge tuples with vertex value initialization.
def fromTuple2DataSet[K: TypeInformation : ClassTag](
edges: DataSet[(K, K)],
env: ExecutionEnvironment
): Graph[K, NullValue, NullValue]Creates a graph from simple edge pairs with no values.
def fromTuple2DataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag](
edges: DataSet[(K, K)],
vertexValueInitializer: MapFunction[K, VV],
env: ExecutionEnvironment
): Graph[K, VV, NullValue]Creates a graph from edge pairs with vertex value initialization.
def fromCsvReader[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag](
env: ExecutionEnvironment,
pathEdges: String,
pathVertices: String = null,
lineDelimiterVertices: String = "\n",
fieldDelimiterVertices: String = ",",
quoteCharacterVertices: Character = null,
ignoreFirstLineVertices: Boolean = false,
ignoreCommentsVertices: String = null,
lenientVertices: Boolean = false,
includedFieldsVertices: Array[Int] = null,
lineDelimiterEdges: String = "\n",
fieldDelimiterEdges: String = ",",
quoteCharacterEdges: Character = null,
ignoreFirstLineEdges: Boolean = false,
ignoreCommentsEdges: String = null,
lenientEdges: Boolean = false,
includedFieldsEdges: Array[Int] = null,
vertexValueInitializer: MapFunction[K, VV] = null
): Graph[K, VV, EV]Creates a graph from CSV files with extensive configuration options.
Parameters:
env - Flink execution environmentpathEdges - File path containing the edges (required)pathVertices - File path containing the vertices (optional)lineDelimiterVertices - Line separator for vertices file (default: "\n")fieldDelimiterVertices - Field separator for vertices file (default: ",")quoteCharacterVertices - Quote character for vertices file parsingignoreFirstLineVertices - Whether to skip first line in vertices fileignoreCommentsVertices - String prefix for comment lines to ignore in vertices filelenientVertices - Whether to silently ignore malformed lines in vertices fileincludedFieldsVertices - Array of field indices to read from vertices filelineDelimiterEdges - Line separator for edges file (default: "\n")fieldDelimiterEdges - Field separator for edges file (default: ",")quoteCharacterEdges - Quote character for edges file parsingignoreFirstLineEdges - Whether to skip first line in edges fileignoreCommentsEdges - String prefix for comment lines to ignore in edges filelenientEdges - Whether to silently ignore malformed lines in edges fileincludedFieldsEdges - Array of field indices to read from edges filevertexValueInitializer - Function to initialize vertex values if no vertices file providedimport org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}
val env = ExecutionEnvironment.getExecutionEnvironment
// From collections
val vertices = List(
new Vertex(1L, "Node A"),
new Vertex(2L, "Node B"),
new Vertex(3L, "Node C")
)
val edges = List(
new Edge(1L, 2L, 1.0),
new Edge(2L, 3L, 2.0),
new Edge(1L, 3L, 3.0)
)
val graph = Graph.fromCollection(vertices, edges, env)val vertexTuples = env.fromCollection(List(
(1L, "A"),
(2L, "B"),
(3L, "C")
))
val edgeTuples = env.fromCollection(List(
(1L, 2L, 1.0),
(2L, 3L, 2.0)
))
val graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env)val graph = Graph.fromCsvReader[Long, String, Double](
env = env,
pathEdges = "path/to/edges.csv",
pathVertices = "path/to/vertices.csv",
fieldDelimiterEdges = "\t",
ignoreFirstLineEdges = true
)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gelly-scala-2-10