Scala API for Apache Flink's Gelly graph processing library with type-safe functional programming support
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Flink Gelly Scala provides idiomatic Scala APIs for Apache Flink's Gelly graph processing library. It offers type-safe, functional programming support for large-scale graph processing with comprehensive operations including graph transformations, iterative algorithms, and graph analytics, all integrated with Flink's distributed processing engine.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly-scala_2.11</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.api.scala._import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.api.scala._
// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// Create vertices and edges
val vertices = env.fromCollection(Seq(
new Vertex(1L, "Alice"),
new Vertex(2L, "Bob"),
new Vertex(3L, "Charlie")
))
val edges = env.fromCollection(Seq(
new Edge(1L, 2L, 0.5),
new Edge(2L, 3L, 0.3),
new Edge(1L, 3L, 0.8)
))
// Create graph
val graph = Graph.fromDataSet(vertices, edges, env)
// Perform basic operations
val degrees = graph.getDegrees()
val filteredGraph = graph.filterOnVertices(_.getValue.length > 3)
val mappedGraph = graph.mapVertices(v => v.getValue.toUpperCase)Flink Gelly Scala is built around several key components:
Core functionality for creating, modifying, and managing graph structures with various data sources and formats.
// Factory methods in Graph companion object
object Graph {
def fromDataSet[K, VV, EV](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]],
env: ExecutionEnvironment): Graph[K, VV, EV]
def fromCollection[K, VV, EV](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]],
env: ExecutionEnvironment): Graph[K, VV, EV]
def fromTupleDataSet[K, VV, EV](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
env: ExecutionEnvironment): Graph[K, VV, EV]
}
// Graph modification methods
class Graph[K, VV, EV] {
def addVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV): Graph[K, VV, EV]
def removeVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
def union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
}Type-safe transformation operations for mapping vertex and edge values, filtering, and structural modifications.
class Graph[K, VV, EV] {
def mapVertices[NV](fun: Vertex[K, VV] => NV): Graph[K, NV, EV]
def mapEdges[NV](fun: Edge[K, EV] => NV): Graph[K, VV, NV]
def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean,
edgeFilterFun: Edge[K, EV] => Boolean): Graph[K, VV, EV]
def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean): Graph[K, VV, EV]
def translateGraphIds[NEW](fun: (K, NEW) => NEW): Graph[NEW, VV, EV]
}Iterative computation models for implementing sophisticated graph algorithms using scatter-gather, gather-sum-apply, and vertex-centric paradigms.
class Graph[K, VV, EV] {
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],
gatherFunction: GatherFunction[K, VV, M],
maxIterations: Int): Graph[K, VV, EV]
def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M],
sumFunction: SumFunction[VV, EV, M],
applyFunction: ApplyFunction[K, VV, M],
maxIterations: Int): Graph[K, VV, EV]
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M],
combineFunction: MessageCombiner[K, M],
maxIterations: Int): Graph[K, VV, EV]
}Built-in graph metrics, degree calculations, neighborhood operations, and custom reduction functions for graph analysis.
class Graph[K, VV, EV] {
def getDegrees(): DataSet[(K, LongValue)]
def inDegrees(): DataSet[(K, LongValue)]
def outDegrees(): DataSet[(K, LongValue)]
def numberOfVertices(): Long
def numberOfEdges(): Long
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV],
direction: EdgeDirection): DataSet[(K, VV)]
def groupReduceOnNeighbors[T](neighborsFunction: NeighborsFunction[K, VV, EV, T],
direction: EdgeDirection): DataSet[T]
}Abstract base classes for implementing custom graph processing functions with access to vertex values, edges, and neighborhoods.
abstract class EdgesFunction[K, EV, T] {
def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T]): Unit
}
abstract class NeighborsFunction[K, VV, EV, T] {
def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])],
out: Collector[T]): Unit
}
abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] {
def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T]): Unit
}Operations for joining graphs with external datasets and converting between different data representations.
class Graph[K, VV, EV] {
def joinWithVertices[T](inputDataSet: DataSet[(K, T)],
fun: (VV, T) => VV): Graph[K, VV, EV]
def joinWithEdges[T](inputDataSet: DataSet[(K, K, T)],
fun: (EV, T) => EV): Graph[K, VV, EV]
def getVerticesAsTuple2(): DataSet[(K, VV)]
def getEdgesAsTuple3(): DataSet[(K, K, EV)]
def getTriplets(): DataSet[Triplet[K, VV, EV]]
}class Graph[K, VV, EV]
// Core graph elements (from Java Gelly)
class Vertex[K, VV](id: K, value: VV)
class Edge[K, EV](source: K, target: K, value: EV)
class Triplet[K, VV, EV] // Represents (srcVertex, edge, trgVertex)
// Direction enumeration
object EdgeDirection {
val IN: EdgeDirection
val OUT: EdgeDirection
val ALL: EdgeDirection
}
// Utility mappers
class Tuple2ToVertexMap[K, VV] extends MapFunction[(K, VV), Vertex[K, VV]]
class Tuple3ToEdgeMap[K, EV] extends MapFunction[(K, K, EV), Edge[K, EV]]
class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)]
class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)]