Scala API for Apache Flink's Gelly graph processing library providing distributed graph operations and algorithms
npx @tessl/cli install tessl/maven-org-apache-flink--flink-gelly-scala_2-10@1.3.0Apache Flink Gelly Scala is a graph processing library that provides a high-level Scala API for distributed graph analytics and algorithms on Apache Flink. It enables developers to build scalable graph processing applications using Scala's functional programming paradigms, with support for vertex-centric iterations, scatter-gather patterns, and a comprehensive set of graph operations.
Add to your Maven pom.xml:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly-scala_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.graph.scala._
import org.apache.flink.api.scala._
import org.apache.flink.graph.{Edge, Vertex}import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}
// Set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// Create vertices and edges
val vertices = env.fromCollection(List(
new Vertex(1L, "A"),
new Vertex(2L, "B"),
new Vertex(3L, "C")
))
val edges = env.fromCollection(List(
new Edge(1L, 2L, 1.0),
new Edge(2L, 3L, 2.0)
))
// Create graph
val graph = Graph.fromDataSet(vertices, edges, env)
// Basic operations
val vertexCount = graph.numberOfVertices()
val degrees = graph.getDegrees()The Flink Gelly Scala library consists of several key components:
// Main graph type with key (K), vertex value (VV), and edge value (EV) type parameters
final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
// Core graph elements from Flink Gelly Java API
class Vertex[K, VV](id: K, value: VV)
class Edge[K, EV](source: K, target: K, value: EV)
class Triplet[K, VV, EV](srcVertexId: K, trgVertexId: K, srcVertexValue: VV, trgVertexValue: VV, edgeValue: EV)Create graphs from various data sources including DataSets, collections, tuples, and CSV files.
// From DataSets
def fromDataSet[K, VV, EV](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV]
def fromDataSet[K, EV](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV]
def fromDataSet[K, VV, EV](edges: DataSet[Edge[K, EV]], vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV]
// From Collections
def fromCollection[K, VV, EV](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV]
def fromCollection[K, EV](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV]// From Tuples - convenient for creating graphs from structured data
def fromTupleDataSet[K, VV, EV](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, VV, EV]
def fromTupleDataSet[K, EV](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV]
def fromTuple2DataSet[K](edges: DataSet[(K, K)], env: ExecutionEnvironment): Graph[K, NullValue, NullValue]
// From CSV Files - extensive configuration for file-based graph creation
def fromCsvReader[K, VV, EV](env: ExecutionEnvironment, pathEdges: String, ...): Graph[K, VV, EV]Complete Graph Construction Documentation
Access graph data and perform basic transformations.
// Access graph components
def getVertices: DataSet[Vertex[K, VV]]
def getEdges: DataSet[Edge[K, EV]]
def getVerticesAsTuple2(): DataSet[(K, VV)]
def getEdgesAsTuple3(): DataSet[(K, K, EV)]
def getTriplets(): DataSet[Triplet[K, VV, EV]]
// Graph metrics
def numberOfVertices(): Long
def numberOfEdges(): Long
def getVertexIds(): DataSet[K]
def getEdgeIds(): DataSet[(K, K)]// Map transformations
def mapVertices[NV: TypeInformation : ClassTag](mapper: MapFunction[Vertex[K, VV], NV]): Graph[K, NV, EV]
def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] => NV): Graph[K, NV, EV]
def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K, EV], NV]): Graph[K, VV, NV]
def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV): Graph[K, VV, NV]
// Translation operations
def translateGraphIds[NEW: TypeInformation : ClassTag](translator: TranslateFunction[K, NEW]): Graph[NEW, VV, EV]
def translateVertexValues[NEW: TypeInformation : ClassTag](translator: TranslateFunction[VV, NEW]): Graph[K, NEW, EV]
def translateEdgeValues[NEW: TypeInformation : ClassTag](translator: TranslateFunction[EV, NEW]): Graph[K, VV, NEW]Complete Graph Operations Documentation
Perform analytics and computations on graph structure.
// Degree metrics
def inDegrees(): DataSet[(K, LongValue)]
def outDegrees(): DataSet[(K, LongValue)]
def getDegrees(): DataSet[(K, LongValue)]// Structural transformations
def getUndirected(): Graph[K, VV, EV]
def reverse(): Graph[K, VV, EV]
def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]
def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]): Graph[K, VV, EV]
def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]// Graph set operations
def union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
def difference(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV]
// Graph validation
def validate(validator: GraphValidator[K, VV, EV]): BooleanComplete Graph Analytics Documentation
Support for implementing custom graph algorithms using different iteration patterns.
// Scatter-Gather iterations
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV], gatherFunction: GatherFunction[K, VV, M], maxIterations: Int): Graph[K, VV, EV]
// Gather-Sum-Apply iterations
def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction: SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int): Graph[K, VV, EV]
// Vertex-centric iterations (Pregel-style)
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M], combineFunction: MessageCombiner[K, M], maxIterations: Int): Graph[K, VV, EV]
// Algorithm framework integration
def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]Complete Iterative Algorithms Documentation
Access to the comprehensive Gelly algorithm library through the Scala API.
// Algorithm execution
def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]Available Algorithms:
Usage Examples:
import org.apache.flink.graph.library.{PageRank, ConnectedComponents, SingleSourceShortestPaths}
// PageRank with damping factor and maximum iterations
val pageRankResult = graph.run(new PageRank[Long](dampingFactor = 0.85, maxIterations = 10))
// Connected Components - finds connected subgraphs
val components = graph.run(new ConnectedComponents[Long, String, Double](maxIterations = 10))
// Single Source Shortest Paths from vertex with ID 1L
val shortestPaths = graph.run(new SingleSourceShortestPaths[Long, Double](srcVertexId = 1L, maxIterations = 10))Operations for working with vertex neighborhoods and performing aggregations.
// Edge-based aggregations
def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T], direction: EdgeDirection): DataSet[T]
def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]
// Neighbor-based aggregations
def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunction[K, VV, EV, T], direction: EdgeDirection): DataSet[T]
def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]
// Reduction operations
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction: EdgeDirection): DataSet[(K, VV)]
def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): DataSet[(K, EV)]Methods for adding and removing vertices and edges.
// Adding elements
def addVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV]
def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV]
def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV): Graph[K, VV, EV]
// Removing elements
def removeVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV]
def removeEdge(edge: Edge[K, EV]): Graph[K, VV, EV]
def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV]Join graph data with external datasets to enrich vertex and edge information.
// Vertex joins
def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV]
def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV): Graph[K, VV, EV]
// Edge joins
def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV): Graph[K, VV, EV]
def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]