Scala API for Apache Flink's Gelly graph processing library providing distributed graph operations and algorithms
Complete API for implementing custom graph algorithms using different iteration patterns and the algorithm framework.
Flink Gelly Scala provides three main iteration patterns for implementing graph algorithms:
def runScatterGatherIteration[M](
scatterFunction: ScatterFunction[K, VV, M, EV],
gatherFunction: GatherFunction[K, VV, M],
maxIterations: Int
): Graph[K, VV, EV]Runs a scatter-gather iteration on the graph without configuration options.
Parameters:
scatterFunction - Function that scatters messages from vertices to neighborsgatherFunction - Function that gathers messages and updates vertex valuesmaxIterations - Maximum number of iterations to performReturns: Updated graph after convergence or maximum iterations
def runScatterGatherIteration[M](
scatterFunction: ScatterFunction[K, VV, M, EV],
gatherFunction: GatherFunction[K, VV, M],
maxIterations: Int,
parameters: ScatterGatherConfiguration
): Graph[K, VV, EV]Runs scatter-gather iteration with configuration options.
Parameters:
scatterFunction - Message scattering functiongatherFunction - Message gathering and vertex update functionmaxIterations - Maximum iterationsparameters - Iteration configuration parametersdef runGatherSumApplyIteration[M](
gatherFunction: GatherFunction[VV, EV, M],
sumFunction: SumFunction[VV, EV, M],
applyFunction: ApplyFunction[K, VV, M],
maxIterations: Int
): Graph[K, VV, EV]Runs a Gather-Sum-Apply iteration without configuration options.
Parameters:
gatherFunction - Collects information about adjacent vertices and edgessumFunction - Aggregates the gathered informationapplyFunction - Updates vertex values with the aggregated datamaxIterations - Maximum number of iterationsdef runGatherSumApplyIteration[M](
gatherFunction: GatherFunction[VV, EV, M],
sumFunction: SumFunction[VV, EV, M],
applyFunction: ApplyFunction[K, VV, M],
maxIterations: Int,
parameters: GSAConfiguration
): Graph[K, VV, EV]Runs GSA iteration with configuration parameters.
Parameters:
gatherFunction - Information gathering functionsumFunction - Aggregation functionapplyFunction - Vertex update functionmaxIterations - Maximum iterationsparameters - GSA configuration parametersdef runVertexCentricIteration[M](
computeFunction: ComputeFunction[K, VV, EV, M],
combineFunction: MessageCombiner[K, M],
maxIterations: Int
): Graph[K, VV, EV]Runs a vertex-centric iteration without configuration options.
Parameters:
computeFunction - Vertex compute function that processes messages and updates valuescombineFunction - Optional message combiner functionmaxIterations - Maximum number of iterationsdef runVertexCentricIteration[M](
computeFunction: ComputeFunction[K, VV, EV, M],
combineFunction: MessageCombiner[K, M],
maxIterations: Int,
parameters: VertexCentricConfiguration
): Graph[K, VV, EV]Runs vertex-centric iteration with configuration parameters.
Parameters:
computeFunction - Vertex computation functioncombineFunction - Message combiner for reducing messagesmaxIterations - Maximum iterationsparameters - Vertex-centric configuration parametersdef run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): TExecutes a graph algorithm that implements the GraphAlgorithm interface.
Parameters:
algorithm - Graph algorithm implementationReturns: Algorithm result of type T
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]Executes a graph analytic. Analytics are terminal operations whose results are retrieved via accumulators, allowing composition of multiple analytics and algorithms into a single program.
Parameters:
analytic - Graph analytic implementationReturns: The analytic instance for result retrieval
abstract class ScatterFunction[K, VV, M, EV] {
def sendMessages(vertex: Vertex[K, VV]): Unit
def sendMessageTo(targetVertexId: K, message: M): Unit
def getOutEdges: Iterable[Edge[K, EV]]
def getInEdges: Iterable[Edge[K, EV]]
}Base class for scatter functions that send messages from vertices to their neighbors.
abstract class GatherFunction[K, VV, M] {
def gather(messages: Iterable[M]): VV
def getSuperstepNumber: Int
}Base class for gather functions in scatter-gather iterations.
abstract class GatherFunction[VV, EV, M] {
def gather(neighborVertex: Vertex[_, VV], edge: Edge[_, EV]): M
}Base class for gather functions in GSA iterations.
abstract class SumFunction[VV, EV, M] {
def sum(newValue: M, currentValue: M): M
}Base class for sum functions that aggregate gathered information in GSA iterations.
abstract class ApplyFunction[K, VV, M] {
def apply(newValue: M, currentValue: VV): VV
def getSuperstepNumber: Int
}Base class for apply functions that update vertex values in GSA iterations.
abstract class ComputeFunction[K, VV, EV, M] {
def compute(vertex: Vertex[K, VV], messages: Iterable[M]): Unit
def sendMessageTo(targetVertexId: K, message: M): Unit
def setNewVertexValue(newValue: VV): Unit
def getSuperstepNumber: Int
def getOutEdges: Iterable[Edge[K, EV]]
def getTotalNumVertices: Long
}Base class for compute functions in vertex-centric iterations.
abstract class MessageCombiner[K, M] {
def combineMessages(messages: Iterable[M]): M
}Base class for combining multiple messages sent to the same vertex.
class ScatterGatherConfiguration {
def setName(name: String): ScatterGatherConfiguration
def setParallelism(parallelism: Int): ScatterGatherConfiguration
def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): ScatterGatherConfiguration
def setOptNumVertices(numVertices: Long): ScatterGatherConfiguration
}Configuration for scatter-gather iterations.
class GSAConfiguration {
def setName(name: String): GSAConfiguration
def setParallelism(parallelism: Int): GSAConfiguration
def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): GSAConfiguration
def setOptNumVertices(numVertices: Long): GSAConfiguration
}Configuration for gather-sum-apply iterations.
class VertexCentricConfiguration {
def setName(name: String): VertexCentricConfiguration
def setParallelism(parallelism: Int): VertexCentricConfiguration
def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): VertexCentricConfiguration
def setOptNumVertices(numVertices: Long): VertexCentricConfiguration
}Configuration for vertex-centric iterations.
import org.apache.flink.graph.spargel.{ScatterFunction, GatherFunction}
// Scatter function sends current distance + edge weight to neighbors
class DistanceScatter extends ScatterFunction[Long, Double, Double, Double] {
override def sendMessages(vertex: Vertex[Long, Double]): Unit = {
if (vertex.getValue < Double.MaxValue) {
for (edge <- getOutEdges) {
sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
}
}
}
}
// Gather function takes minimum of received distances
class DistanceGather extends GatherFunction[Long, Double, Double] {
override def gather(messages: Iterable[Double]): Double = {
var min = Double.MaxValue
for (msg <- messages) {
min = Math.min(min, msg)
}
min
}
}
// Run algorithm
val shortestPaths = graph.runScatterGatherIteration(
new DistanceScatter(),
new DistanceGather(),
maxIterations = 10
)import org.apache.flink.graph.pregel.ComputeFunction
import org.apache.flink.graph.{Vertex, Edge}
class PageRankCompute(maxIterations: Int) extends ComputeFunction[Long, Double, Double, Double] {
override def compute(vertex: Vertex[Long, Double], messages: Iterable[Double]): Unit = {
val dampingFactor = 0.85
val numVertices = getTotalNumVertices
var sum = 0.0
for (msg <- messages) {
sum += msg
}
val newValue = (1.0 - dampingFactor) / numVertices + dampingFactor * sum
setNewVertexValue(newValue)
// Send messages to neighbors
if (getSuperstepNumber < maxIterations - 1) {
val outDegree = getOutEdges.size
if (outDegree > 0) {
val msgValue = newValue / outDegree
for (edge <- getOutEdges) {
sendMessageTo(edge.getTarget, msgValue)
}
}
}
}
}
// Run PageRank
val maxIterations = 10
val pageRankResult = graph.runVertexCentricIteration(
new PageRankCompute(maxIterations),
null, // No message combiner
maxIterations
)import org.apache.flink.graph.gsa.{GatherFunction, SumFunction, ApplyFunction}
class ComponentIdGather extends GatherFunction[Long, NullValue, Long] {
override def gather(neighborVertex: Vertex[_, Long], edge: Edge[_, NullValue]): Long = {
neighborVertex.getValue
}
}
class ComponentIdSum extends SumFunction[Long, NullValue, Long] {
override def sum(newValue: Long, currentValue: Long): Long = {
Math.min(newValue, currentValue)
}
}
class ComponentIdApply extends ApplyFunction[Long, Long, Long] {
override def apply(newValue: Long, currentValue: Long): Long = {
Math.min(newValue, currentValue)
}
}
// Run connected components
val components = graph.runGatherSumApplyIteration(
new ComponentIdGather(),
new ComponentIdSum(),
new ComponentIdApply(),
maxIterations = 10
)import org.apache.flink.graph.library.PageRank
// Use pre-implemented PageRank algorithm
val pageRankAlgorithm = new PageRank[Long](dampingFactor = 0.85, maxIterations = 10)
val pageRankResult = graph.run(pageRankAlgorithm)
// The result is a DataSet[(Long, Double)] containing vertex IDs and PageRank scores
val topVertices = pageRankResult.first(10)Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-gelly-scala-2-10