Complete API for implementing custom graph algorithms using different iteration patterns and the algorithm framework.
Flink Gelly Scala provides three main iteration patterns for implementing graph algorithms:
def runScatterGatherIteration[M](
scatterFunction: ScatterFunction[K, VV, M, EV],
gatherFunction: GatherFunction[K, VV, M],
maxIterations: Int
): Graph[K, VV, EV]Runs a scatter-gather iteration on the graph without configuration options.
Parameters:
scatterFunction - Function that scatters messages from vertices to neighborsgatherFunction - Function that gathers messages and updates vertex valuesmaxIterations - Maximum number of iterations to performReturns: Updated graph after convergence or maximum iterations
def runScatterGatherIteration[M](
scatterFunction: ScatterFunction[K, VV, M, EV],
gatherFunction: GatherFunction[K, VV, M],
maxIterations: Int,
parameters: ScatterGatherConfiguration
): Graph[K, VV, EV]Runs scatter-gather iteration with configuration options.
Parameters:
scatterFunction - Message scattering functiongatherFunction - Message gathering and vertex update functionmaxIterations - Maximum iterationsparameters - Iteration configuration parametersdef runGatherSumApplyIteration[M](
gatherFunction: GatherFunction[VV, EV, M],
sumFunction: SumFunction[VV, EV, M],
applyFunction: ApplyFunction[K, VV, M],
maxIterations: Int
): Graph[K, VV, EV]Runs a Gather-Sum-Apply iteration without configuration options.
Parameters:
gatherFunction - Collects information about adjacent vertices and edgessumFunction - Aggregates the gathered informationapplyFunction - Updates vertex values with the aggregated datamaxIterations - Maximum number of iterationsdef runGatherSumApplyIteration[M](
gatherFunction: GatherFunction[VV, EV, M],
sumFunction: SumFunction[VV, EV, M],
applyFunction: ApplyFunction[K, VV, M],
maxIterations: Int,
parameters: GSAConfiguration
): Graph[K, VV, EV]Runs GSA iteration with configuration parameters.
Parameters:
gatherFunction - Information gathering functionsumFunction - Aggregation functionapplyFunction - Vertex update functionmaxIterations - Maximum iterationsparameters - GSA configuration parametersdef runVertexCentricIteration[M](
computeFunction: ComputeFunction[K, VV, EV, M],
combineFunction: MessageCombiner[K, M],
maxIterations: Int
): Graph[K, VV, EV]Runs a vertex-centric iteration without configuration options.
Parameters:
computeFunction - Vertex compute function that processes messages and updates valuescombineFunction - Optional message combiner functionmaxIterations - Maximum number of iterationsdef runVertexCentricIteration[M](
computeFunction: ComputeFunction[K, VV, EV, M],
combineFunction: MessageCombiner[K, M],
maxIterations: Int,
parameters: VertexCentricConfiguration
): Graph[K, VV, EV]Runs vertex-centric iteration with configuration parameters.
Parameters:
computeFunction - Vertex computation functioncombineFunction - Message combiner for reducing messagesmaxIterations - Maximum iterationsparameters - Vertex-centric configuration parametersdef run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): TExecutes a graph algorithm that implements the GraphAlgorithm interface.
Parameters:
algorithm - Graph algorithm implementationReturns: Algorithm result of type T
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]Executes a graph analytic. Analytics are terminal operations whose results are retrieved via accumulators, allowing composition of multiple analytics and algorithms into a single program.
Parameters:
analytic - Graph analytic implementationReturns: The analytic instance for result retrieval
abstract class ScatterFunction[K, VV, M, EV] {
def sendMessages(vertex: Vertex[K, VV]): Unit
def sendMessageTo(targetVertexId: K, message: M): Unit
def getOutEdges: Iterable[Edge[K, EV]]
def getInEdges: Iterable[Edge[K, EV]]
}Base class for scatter functions that send messages from vertices to their neighbors.
abstract class GatherFunction[K, VV, M] {
def gather(messages: Iterable[M]): VV
def getSuperstepNumber: Int
}Base class for gather functions in scatter-gather iterations.
abstract class GatherFunction[VV, EV, M] {
def gather(neighborVertex: Vertex[_, VV], edge: Edge[_, EV]): M
}Base class for gather functions in GSA iterations.
abstract class SumFunction[VV, EV, M] {
def sum(newValue: M, currentValue: M): M
}Base class for sum functions that aggregate gathered information in GSA iterations.
abstract class ApplyFunction[K, VV, M] {
def apply(newValue: M, currentValue: VV): VV
def getSuperstepNumber: Int
}Base class for apply functions that update vertex values in GSA iterations.
abstract class ComputeFunction[K, VV, EV, M] {
def compute(vertex: Vertex[K, VV], messages: Iterable[M]): Unit
def sendMessageTo(targetVertexId: K, message: M): Unit
def setNewVertexValue(newValue: VV): Unit
def getSuperstepNumber: Int
def getOutEdges: Iterable[Edge[K, EV]]
def getTotalNumVertices: Long
}Base class for compute functions in vertex-centric iterations.
abstract class MessageCombiner[K, M] {
def combineMessages(messages: Iterable[M]): M
}Base class for combining multiple messages sent to the same vertex.
class ScatterGatherConfiguration {
def setName(name: String): ScatterGatherConfiguration
def setParallelism(parallelism: Int): ScatterGatherConfiguration
def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): ScatterGatherConfiguration
def setOptNumVertices(numVertices: Long): ScatterGatherConfiguration
}Configuration for scatter-gather iterations.
class GSAConfiguration {
def setName(name: String): GSAConfiguration
def setParallelism(parallelism: Int): GSAConfiguration
def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): GSAConfiguration
def setOptNumVertices(numVertices: Long): GSAConfiguration
}Configuration for gather-sum-apply iterations.
class VertexCentricConfiguration {
def setName(name: String): VertexCentricConfiguration
def setParallelism(parallelism: Int): VertexCentricConfiguration
def setSolutionSetUnmanagedMemory(unmanagedMemory: Boolean): VertexCentricConfiguration
def setOptNumVertices(numVertices: Long): VertexCentricConfiguration
}Configuration for vertex-centric iterations.
import org.apache.flink.graph.spargel.{ScatterFunction, GatherFunction}
// Scatter function sends current distance + edge weight to neighbors
class DistanceScatter extends ScatterFunction[Long, Double, Double, Double] {
override def sendMessages(vertex: Vertex[Long, Double]): Unit = {
if (vertex.getValue < Double.MaxValue) {
for (edge <- getOutEdges) {
sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
}
}
}
}
// Gather function takes minimum of received distances
class DistanceGather extends GatherFunction[Long, Double, Double] {
override def gather(messages: Iterable[Double]): Double = {
var min = Double.MaxValue
for (msg <- messages) {
min = Math.min(min, msg)
}
min
}
}
// Run algorithm
val shortestPaths = graph.runScatterGatherIteration(
new DistanceScatter(),
new DistanceGather(),
maxIterations = 10
)import org.apache.flink.graph.pregel.ComputeFunction
import org.apache.flink.graph.{Vertex, Edge}
class PageRankCompute(maxIterations: Int) extends ComputeFunction[Long, Double, Double, Double] {
override def compute(vertex: Vertex[Long, Double], messages: Iterable[Double]): Unit = {
val dampingFactor = 0.85
val numVertices = getTotalNumVertices
var sum = 0.0
for (msg <- messages) {
sum += msg
}
val newValue = (1.0 - dampingFactor) / numVertices + dampingFactor * sum
setNewVertexValue(newValue)
// Send messages to neighbors
if (getSuperstepNumber < maxIterations - 1) {
val outDegree = getOutEdges.size
if (outDegree > 0) {
val msgValue = newValue / outDegree
for (edge <- getOutEdges) {
sendMessageTo(edge.getTarget, msgValue)
}
}
}
}
}
// Run PageRank
val maxIterations = 10
val pageRankResult = graph.runVertexCentricIteration(
new PageRankCompute(maxIterations),
null, // No message combiner
maxIterations
)import org.apache.flink.graph.gsa.{GatherFunction, SumFunction, ApplyFunction}
class ComponentIdGather extends GatherFunction[Long, NullValue, Long] {
override def gather(neighborVertex: Vertex[_, Long], edge: Edge[_, NullValue]): Long = {
neighborVertex.getValue
}
}
class ComponentIdSum extends SumFunction[Long, NullValue, Long] {
override def sum(newValue: Long, currentValue: Long): Long = {
Math.min(newValue, currentValue)
}
}
class ComponentIdApply extends ApplyFunction[Long, Long, Long] {
override def apply(newValue: Long, currentValue: Long): Long = {
Math.min(newValue, currentValue)
}
}
// Run connected components
val components = graph.runGatherSumApplyIteration(
new ComponentIdGather(),
new ComponentIdSum(),
new ComponentIdApply(),
maxIterations = 10
)import org.apache.flink.graph.library.PageRank
// Use pre-implemented PageRank algorithm
val pageRankAlgorithm = new PageRank[Long](dampingFactor = 0.85, maxIterations = 10)
val pageRankResult = graph.run(pageRankAlgorithm)
// The result is a DataSet[(Long, Double)] containing vertex IDs and PageRank scores
val topVertices = pageRankResult.first(10)