Scala API for Apache Flink's Gelly graph processing library with type-safe functional programming support
npx @tessl/cli install tessl/maven-org-apache-flink--flink-gelly-scala_2-11@1.14.0Flink Gelly Scala provides idiomatic Scala APIs for Apache Flink's Gelly graph processing library. It offers type-safe, functional programming support for large-scale graph processing with comprehensive operations including graph transformations, iterative algorithms, and graph analytics, all integrated with Flink's distributed processing engine.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly-scala_2.11</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.api.scala._import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.api.scala._
// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// Create vertices and edges
val vertices = env.fromCollection(Seq(
new Vertex(1L, "Alice"),
new Vertex(2L, "Bob"),
new Vertex(3L, "Charlie")
))
val edges = env.fromCollection(Seq(
new Edge(1L, 2L, 0.5),
new Edge(2L, 3L, 0.3),
new Edge(1L, 3L, 0.8)
))
// Create graph
val graph = Graph.fromDataSet(vertices, edges, env)
// Perform basic operations
val degrees = graph.getDegrees()
val filteredGraph = graph.filterOnVertices(_.getValue.length > 3)
val mappedGraph = graph.mapVertices(v => v.getValue.toUpperCase)Flink Gelly Scala is built around several key components:
Core functionality for creating, modifying, and managing graph structures with various data sources and formats.
// Factory methods in Graph companion object
object Graph {
def fromDataSet[K, VV, EV](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]],
env: ExecutionEnvironment): Graph[K, VV, EV]
def fromCollection[K, VV, EV](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]],
env: ExecutionEnvironment): Graph[K, VV, EV]
def fromTupleDataSet[K, VV, EV](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
env: ExecutionEnvironment): Graph[K, VV, EV]
}
// Graph modification methods
class Graph[K, VV, EV] {
def addVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV): Graph[K, VV, EV]
def removeVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
def union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
}Type-safe transformation operations for mapping vertex and edge values, filtering, and structural modifications.
class Graph[K, VV, EV] {
def mapVertices[NV](fun: Vertex[K, VV] => NV): Graph[K, NV, EV]
def mapEdges[NV](fun: Edge[K, EV] => NV): Graph[K, VV, NV]
def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean,
edgeFilterFun: Edge[K, EV] => Boolean): Graph[K, VV, EV]
def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean): Graph[K, VV, EV]
def translateGraphIds[NEW](fun: (K, NEW) => NEW): Graph[NEW, VV, EV]
}Iterative computation models for implementing sophisticated graph algorithms using scatter-gather, gather-sum-apply, and vertex-centric paradigms.
class Graph[K, VV, EV] {
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],
gatherFunction: GatherFunction[K, VV, M],
maxIterations: Int): Graph[K, VV, EV]
def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M],
sumFunction: SumFunction[VV, EV, M],
applyFunction: ApplyFunction[K, VV, M],
maxIterations: Int): Graph[K, VV, EV]
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M],
combineFunction: MessageCombiner[K, M],
maxIterations: Int): Graph[K, VV, EV]
}Built-in graph metrics, degree calculations, neighborhood operations, and custom reduction functions for graph analysis.
class Graph[K, VV, EV] {
def getDegrees(): DataSet[(K, LongValue)]
def inDegrees(): DataSet[(K, LongValue)]
def outDegrees(): DataSet[(K, LongValue)]
def numberOfVertices(): Long
def numberOfEdges(): Long
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV],
direction: EdgeDirection): DataSet[(K, VV)]
def groupReduceOnNeighbors[T](neighborsFunction: NeighborsFunction[K, VV, EV, T],
direction: EdgeDirection): DataSet[T]
}Abstract base classes for implementing custom graph processing functions with access to vertex values, edges, and neighborhoods.
abstract class EdgesFunction[K, EV, T] {
def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T]): Unit
}
abstract class NeighborsFunction[K, VV, EV, T] {
def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])],
out: Collector[T]): Unit
}
abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] {
def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T]): Unit
}Operations for joining graphs with external datasets and converting between different data representations.
class Graph[K, VV, EV] {
def joinWithVertices[T](inputDataSet: DataSet[(K, T)],
fun: (VV, T) => VV): Graph[K, VV, EV]
def joinWithEdges[T](inputDataSet: DataSet[(K, K, T)],
fun: (EV, T) => EV): Graph[K, VV, EV]
def getVerticesAsTuple2(): DataSet[(K, VV)]
def getEdgesAsTuple3(): DataSet[(K, K, EV)]
def getTriplets(): DataSet[Triplet[K, VV, EV]]
}class Graph[K, VV, EV]
// Core graph elements (from Java Gelly)
class Vertex[K, VV](id: K, value: VV)
class Edge[K, EV](source: K, target: K, value: EV)
class Triplet[K, VV, EV] // Represents (srcVertex, edge, trgVertex)
// Direction enumeration
object EdgeDirection {
val IN: EdgeDirection
val OUT: EdgeDirection
val ALL: EdgeDirection
}
// Utility mappers
class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]]
class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]]
class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)]
class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)]