Iterative computation models for implementing sophisticated graph algorithms using scatter-gather, gather-sum-apply, and vertex-centric paradigms, along with algorithm execution framework.
Execute pre-built algorithms and analytics on graphs with type-safe result handling.
/**
* Run a graph algorithm on the graph.
* @param algorithm the algorithm to run on the Graph
* @return the result of the graph algorithm
*/
def run[T](algorithm: GraphAlgorithm[K, VV, EV, T]): T
/**
* Run a graph analytic on the graph.
* A GraphAnalytic is similar to a GraphAlgorithm but is terminal and results
* are retrieved via accumulators. A Flink program has a single point of
* execution. A GraphAnalytic defers execution to the user to allow composing
* multiple analytics and algorithms into a single program.
* @param analytic the analytic to run on the Graph
*/
def run[T](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]Vertex-centric iterative computation model where vertices scatter messages along edges and gather messages from neighbors.
/**
* Runs a scatter-gather iteration on the graph.
* No configuration options are provided.
* @param scatterFunction the scatter function
* @param gatherFunction the gather function
* @param maxIterations maximum number of iterations to perform
* @return the updated Graph after the scatter-gather iteration has converged or
* after maximumNumberOfIterations.
*/
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],
gatherFunction: GatherFunction[K, VV, M],
maxIterations: Int): Graph[K, VV, EV]
/**
* Runs a scatter-gather iteration on the graph with configuration options.
* @param scatterFunction the scatter function
* @param gatherFunction the gather function
* @param maxIterations maximum number of iterations to perform
* @param parameters the iteration configuration parameters
* @return the updated Graph after the scatter-gather iteration has converged or
* after maximumNumberOfIterations.
*/
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV],
gatherFunction: GatherFunction[K, VV, M],
maxIterations: Int,
parameters: ScatterGatherConfiguration): Graph[K, VV, EV]Three-phase iterative computation model for graph algorithms that need to collect neighborhood information, aggregate it, and update vertex values.
/**
* Runs a Gather-Sum-Apply iteration on the graph.
* No configuration options are provided.
* @param gatherFunction the gather function collects information about adjacent
* vertices and edges
* @param sumFunction the sum function aggregates the gathered information
* @param applyFunction the apply function updates the vertex values with the aggregates
* @param maxIterations maximum number of iterations to perform
* @tparam M the intermediate type used between gather, sum and apply
* @return the updated Graph after the gather-sum-apply iteration has converged or
* after maximumNumberOfIterations.
*/
def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M],
sumFunction: SumFunction[VV, EV, M],
applyFunction: ApplyFunction[K, VV, M],
maxIterations: Int): Graph[K, VV, EV]
/**
* Runs a Gather-Sum-Apply iteration on the graph with configuration options.
* @param gatherFunction the gather function collects information about adjacent
* vertices and edges
* @param sumFunction the sum function aggregates the gathered information
* @param applyFunction the apply function updates the vertex values with the aggregates
* @param maxIterations maximum number of iterations to perform
* @param parameters the iteration configuration parameters
* @tparam M the intermediate type used between gather, sum and apply
* @return the updated Graph after the gather-sum-apply iteration has converged or
* after maximumNumberOfIterations.
*/
def runGatherSumApplyIteration[M](gatherFunction: GSAGatherFunction[VV, EV, M],
sumFunction: SumFunction[VV, EV, M],
applyFunction: ApplyFunction[K, VV, M],
maxIterations: Int,
parameters: GSAConfiguration): Graph[K, VV, EV]Pregel-style iterative computation model where vertices receive messages, perform computations, and send messages to neighbors.
/**
* Runs a vertex-centric iteration on the graph.
* No configuration options are provided.
* @param computeFunction the compute function
* @param combineFunction the optional message combiner function
* @param maxIterations maximum number of iterations to perform
* @return the updated Graph after the vertex-centric iteration has converged or
* after maximumNumberOfIterations.
*/
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M],
combineFunction: MessageCombiner[K, M],
maxIterations: Int): Graph[K, VV, EV]
/**
* Runs a vertex-centric iteration on the graph with configuration options.
* @param computeFunction the compute function
* @param combineFunction the optional message combiner function
* @param maxIterations maximum number of iterations to perform
* @param parameters the iteration configuration parameters
* @return the updated Graph after the vertex-centric iteration has converged or
* after maximumNumberOfIterations.
*/
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M],
combineFunction: MessageCombiner[K, M],
maxIterations: Int,
parameters: VertexCentricConfiguration): Graph[K, VV, EV]Validate graph properties and constraints using custom validators.
/**
* Validate the graph using a GraphValidator.
* @param validator the validator to apply
* @return true if the graph is valid according to the validator, false otherwise
*/
def validate(validator: GraphValidator[K, VV, EV]): BooleanThe iteration methods use these key function interfaces from the underlying Java Gelly library:
// Scatter-Gather Functions
trait ScatterFunction[K, VV, M, EV] {
def sendMessages(vertex: Vertex[K, VV]): Unit
}
trait GatherFunction[K, VV, M] {
def updateVertex(vertex: Vertex[K, VV], inMessages: MessageIterator[M]): VV
}
// Gather-Sum-Apply Functions
trait GSAGatherFunction[VV, EV, M] {
def gather(neighbor: Neighbor[VV, EV]): M
}
trait SumFunction[VV, EV, M] {
def sum(arg0: M, arg1: M): M
}
trait ApplyFunction[K, VV, M] {
def apply(newValue: M, currentValue: VV): VV
}
// Vertex-Centric Functions
trait ComputeFunction[K, VV, EV, M] {
def compute(vertex: Vertex[K, VV], messages: MessageIterator[M]): Unit
}
trait MessageCombiner[K, M] {
def combineMessages(arg0: M, arg1: M): M
}
// Validation
trait GraphValidator[K, VV, EV] {
def validate(graph: Graph[K, VV, EV]): Boolean
}Algorithm behavior can be customized using configuration objects:
// Configuration classes (from Java Gelly)
class ScatterGatherConfiguration
class GSAConfiguration
class VertexCentricConfigurationUsage Examples:
import org.apache.flink.graph.scala._
import org.apache.flink.graph._
import org.apache.flink.graph.spargel._
import org.apache.flink.graph.gsa._
import org.apache.flink.graph.pregel._
// Example: Single Source Shortest Path using Scatter-Gather
class SSSPScatter[K](sourceId: K) extends ScatterFunction[K, Double, Double, Double] {
override def sendMessages(vertex: Vertex[K, Double]): Unit = {
if (vertex.getId == sourceId || vertex.getValue < Double.MaxValue) {
for (edge <- getEdges) {
sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
}
}
}
}
class SSSPGather extends GatherFunction[K, Double, Double] {
override def updateVertex(vertex: Vertex[K, Double],
inMessages: MessageIterator[Double]): Double = {
var minDistance = vertex.getValue
while (inMessages.hasNext) {
val msg = inMessages.next()
if (msg < minDistance) {
minDistance = msg
}
}
minDistance
}
}
// Run SSSP algorithm
val sourceId = 1L
val scatterFunction = new SSSPScatter(sourceId)
val gatherFunction = new SSSPGather()
val result = graph.runScatterGatherIteration(scatterFunction, gatherFunction, 10)
// Example: PageRank using Gather-Sum-Apply
class PageRankGather extends GSAGatherFunction[Double, Double, Double] {
override def gather(neighbor: Neighbor[Double, Double]): Double = {
neighbor.getNeighborVertex.getValue / neighbor.getNeighborVertex.getOutDegree
}
}
class PageRankSum extends SumFunction[Double, Double, Double] {
override def sum(arg0: Double, arg1: Double): Double = arg0 + arg1
}
class PageRankApply(dampingFactor: Double) extends ApplyFunction[K, Double, Double] {
override def apply(newValue: Double, currentValue: Double): Double = {
(1.0 - dampingFactor) + dampingFactor * newValue
}
}
// Run PageRank algorithm
val gatherFunc = new PageRankGather()
val sumFunc = new PageRankSum()
val applyFunc = new PageRankApply(0.85)
val pageRankResult = graph.runGatherSumApplyIteration(gatherFunc, sumFunc, applyFunc, 10)
// Example: Using built-in algorithms
import org.apache.flink.graph.library._
val ssspAlgorithm = new SingleSourceShortestPaths[K, Double](sourceId, 10)
val distances = graph.run(ssspAlgorithm)
val pageRankAlgorithm = new PageRank[K, Double, Double](0.85, 10)
val pageRankValues = graph.run(pageRankAlgorithm)Best for algorithms where:
Best for algorithms where:
Best for algorithms where: