Abstract base classes for implementing custom graph processing functions with access to vertex values, edges, and neighborhoods. These classes provide Scala-friendly interfaces for the underlying Java Gelly function types.
Base class for processing edges of vertices without access to vertex values.
/**
* Abstract class for processing edges of a vertex.
* Provides Scala collections interface for edge iteration.
*/
abstract class EdgesFunction[K, EV, T] extends org.apache.flink.graph.EdgesFunction[K, EV, T] {
/**
* Process the edges of a vertex and emit results.
* @param edges iterable of (vertexId, edge) pairs representing the edges of the vertex
* @param out collector for emitting results
*/
def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T]): Unit
}Base class for processing edges of vertices with access to the vertex value.
/**
* Abstract class for processing edges of a vertex with access to the vertex value.
* Provides Scala collections interface for edge iteration.
*/
abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] extends
org.apache.flink.graph.EdgesFunctionWithVertexValue[K, VV, EV, T] {
/**
* Process the edges of a vertex with access to vertex value and emit results.
* @param v the vertex whose edges are being processed
* @param edges iterable of edges connected to the vertex
* @param out collector for emitting results
*/
@throws(classOf[Exception])
def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T]): Unit
}Base class for processing neighbors (edges and adjacent vertices) of vertices.
/**
* Abstract class for processing neighbors of a vertex.
* Provides access to both edges and adjacent vertex information.
* Provides Scala collections interface for neighbor iteration.
*/
abstract class NeighborsFunction[K, VV, EV, T] extends
org.apache.flink.graph.NeighborsFunction[K, VV, EV, T] {
/**
* Process the neighbors of a vertex and emit results.
* @param neighbors iterable of (vertexId, edge, neighborVertex) tuples
* @param out collector for emitting results
*/
def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])],
out: Collector[T]): Unit
}Base class for processing neighbors with access to the central vertex value.
/**
* Abstract class for processing neighbors of a vertex with access to the vertex value.
* Provides access to the central vertex, edges, and adjacent vertex information.
* Provides Scala collections interface for neighbor iteration.
*/
abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] extends
org.apache.flink.graph.NeighborsFunctionWithVertexValue[K, VV, EV, T] {
/**
* Process the neighbors of a vertex with access to vertex value and emit results.
* @param vertex the central vertex whose neighbors are being processed
* @param neighbors iterable of (edge, neighborVertex) pairs
* @param out collector for emitting results
*/
def iterateNeighbors(vertex: Vertex[K, VV],
neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])],
out: Collector[T]): Unit
}Process edges without considering vertex values:
import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.util.Collector
// Count outgoing edges per vertex
class EdgeCounter extends EdgesFunction[Long, Double, (Long, Int)] {
override def iterateEdges(edges: Iterable[(Long, Edge[Long, Double])],
out: Collector[(Long, Int)]): Unit = {
val edgeList = edges.toList
if (edgeList.nonEmpty) {
val vertexId = edgeList.head._1
out.collect((vertexId, edgeList.size))
}
}
}
// Apply the function
val edgeCounts = graph.groupReduceOnEdges(new EdgeCounter(), EdgeDirection.OUT)Process edges while considering the vertex value:
// Filter edges based on vertex value threshold
class EdgeFilter extends EdgesFunctionWithVertexValue[Long, Double, Double, Edge[Long, Double]] {
override def iterateEdges(v: Vertex[Long, Double],
edges: Iterable[Edge[Long, Double]],
out: Collector[Edge[Long, Double]]): Unit = {
val threshold = v.getValue
for (edge <- edges) {
if (edge.getValue >= threshold * 0.5) {
out.collect(edge)
}
}
}
}
// Apply the function
val filteredEdges = graph.groupReduceOnEdges(new EdgeFilter(), EdgeDirection.OUT)Process neighbors to compute local graph properties:
// Compute average neighbor degree
class NeighborDegreeAnalysis extends NeighborsFunction[Long, Double, Double, (Long, Double)] {
override def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Double], Vertex[Long, Double])],
out: Collector[(Long, Double)]): Unit = {
val neighborList = neighbors.toList
if (neighborList.nonEmpty) {
val vertexId = neighborList.head._1
val neighborValues = neighborList.map(_._3.getValue)
val avgNeighborValue = neighborValues.sum / neighborValues.size
out.collect((vertexId, avgNeighborValue))
}
}
}
// Apply the function
val neighborAnalysis = graph.groupReduceOnNeighbors(new NeighborDegreeAnalysis(), EdgeDirection.ALL)Perform complex analysis considering both the central vertex and its neighbors:
// Compute local clustering-like metric
class LocalStructureAnalysis extends NeighborsFunctionWithVertexValue[Long, Double, Double, (Long, Double, Int, Double)] {
override def iterateNeighbors(vertex: Vertex[Long, Double],
neighbors: Iterable[(Edge[Long, Double], Vertex[Long, Double])],
out: Collector[(Long, Double, Int, Double)]): Unit = {
val neighborList = neighbors.toList
val vertexValue = vertex.getValue
val neighborCount = neighborList.size
if (neighborCount > 0) {
val avgEdgeWeight = neighborList.map(_._1.getValue).sum / neighborCount
val result = (vertex.getId, vertexValue, neighborCount, avgEdgeWeight)
out.collect(result)
}
}
}
// Apply the function
val structureAnalysis = graph.groupReduceOnNeighbors(
new LocalStructureAnalysis(),
EdgeDirection.ALL
)Functions can emit multiple results per vertex:
// Emit statistics for each neighbor relationship
class DetailedNeighborStats extends NeighborsFunctionWithVertexValue[Long, String, Double, (Long, String, String, Double)] {
override def iterateNeighbors(vertex: Vertex[Long, String],
neighbors: Iterable[(Edge[Long, Double], Vertex[Long, String])],
out: Collector[(Long, String, String, Double)]): Unit = {
for ((edge, neighbor) <- neighbors) {
val result = (vertex.getId, vertex.getValue, neighbor.getValue, edge.getValue)
out.collect(result)
}
}
}Implement conditional logic based on graph structure:
// Process high-degree vertices differently
class AdaptiveEdgeProcessor extends EdgesFunctionWithVertexValue[Long, Double, Double, (Long, String, Double)] {
override def iterateEdges(v: Vertex[Long, Double],
edges: Iterable[Edge[Long, Double]],
out: Collector[(Long, String, Double)]): Unit = {
val edgeList = edges.toList
val edgeCount = edgeList.size
val analysis = if (edgeCount > 10) {
// High-degree vertex: compute average
val avgWeight = edgeList.map(_.getValue).sum / edgeCount
(v.getId, "high-degree", avgWeight)
} else {
// Low-degree vertex: compute max
val maxWeight = if (edgeList.nonEmpty) edgeList.map(_.getValue).max else 0.0
(v.getId, "low-degree", maxWeight)
}
out.collect(analysis)
}
}Maintain state across processing within a single function call:
// Track edge patterns within neighborhood
class EdgePatternAnalyzer extends NeighborsFunction[Long, Double, Double, (Long, Map[String, Int])] {
override def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Double], Vertex[Long, Double])],
out: Collector[(Long, Map[String, Int])]): Unit = {
val neighborList = neighbors.toList
if (neighborList.nonEmpty) {
val vertexId = neighborList.head._1
// Categorize edges by weight ranges
val patterns = scala.collection.mutable.Map[String, Int]()
for ((_, edge, _) <- neighborList) {
val category = edge.getValue match {
case w if w < 0.3 => "weak"
case w if w < 0.7 => "medium"
case _ => "strong"
}
patterns(category) = patterns.getOrElse(category, 0) + 1
}
out.collect((vertexId, patterns.toMap))
}
}
}// Example using Scala idioms
class ScalaIdiomsExample extends NeighborsFunctionWithVertexValue[Long, String, Double, (Long, Double)] {
override def iterateNeighbors(vertex: Vertex[Long, String],
neighbors: Iterable[(Edge[Long, Double], Vertex[Long, String])],
out: Collector[(Long, Double)]): Unit = {
val result = neighbors
.filter(_._1.getValue > 0.5) // Filter strong edges
.map(_._2.getValue.length.toDouble) // Map to vertex name lengths
.reduceOption(_ + _) // Sum with safe reduction
.getOrElse(0.0) // Default value
out.collect((vertex.getId, result))
}
}These user-defined functions provide the foundation for implementing custom graph algorithms and analytics while maintaining the performance and scalability benefits of Flink's distributed processing engine.