or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-integration.mdgraph-algorithms.mdgraph-analytics.mdgraph-creation.mdgraph-transformations.mdindex.mduser-defined-functions.md
tile.json

user-defined-functions.mddocs/

User-Defined Functions

Abstract base classes for implementing custom graph processing functions with access to vertex values, edges, and neighborhoods. These classes provide Scala-friendly interfaces for the underlying Java Gelly function types.

Capabilities

EdgesFunction

Base class for processing edges of vertices without access to vertex values.

/**
 * Abstract class for processing edges of a vertex.
 * Provides Scala collections interface for edge iteration.
 */
abstract class EdgesFunction[K, EV, T] extends org.apache.flink.graph.EdgesFunction[K, EV, T] {
  /**
   * Process the edges of a vertex and emit results.
   * @param edges iterable of (vertexId, edge) pairs representing the edges of the vertex
   * @param out collector for emitting results
   */
  def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T]): Unit
}

EdgesFunctionWithVertexValue

Base class for processing edges of vertices with access to the vertex value.

/**
 * Abstract class for processing edges of a vertex with access to the vertex value.
 * Provides Scala collections interface for edge iteration.
 */
abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] extends 
    org.apache.flink.graph.EdgesFunctionWithVertexValue[K, VV, EV, T] {
  /**
   * Process the edges of a vertex with access to vertex value and emit results.
   * @param v the vertex whose edges are being processed
   * @param edges iterable of edges connected to the vertex
   * @param out collector for emitting results
   */
  @throws(classOf[Exception])
  def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T]): Unit
}

NeighborsFunction

Base class for processing neighbors (edges and adjacent vertices) of vertices.

/**
 * Abstract class for processing neighbors of a vertex.
 * Provides access to both edges and adjacent vertex information.
 * Provides Scala collections interface for neighbor iteration.
 */
abstract class NeighborsFunction[K, VV, EV, T] extends 
    org.apache.flink.graph.NeighborsFunction[K, VV, EV, T] {
  /**
   * Process the neighbors of a vertex and emit results.
   * @param neighbors iterable of (vertexId, edge, neighborVertex) tuples
   * @param out collector for emitting results
   */
  def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])], 
                       out: Collector[T]): Unit
}

NeighborsFunctionWithVertexValue

Base class for processing neighbors with access to the central vertex value.

/**
 * Abstract class for processing neighbors of a vertex with access to the vertex value.
 * Provides access to the central vertex, edges, and adjacent vertex information.
 * Provides Scala collections interface for neighbor iteration.
 */
abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] extends 
    org.apache.flink.graph.NeighborsFunctionWithVertexValue[K, VV, EV, T] {
  /**
   * Process the neighbors of a vertex with access to vertex value and emit results.
   * @param vertex the central vertex whose neighbors are being processed
   * @param neighbors iterable of (edge, neighborVertex) pairs
   * @param out collector for emitting results
   */
  def iterateNeighbors(vertex: Vertex[K, VV], 
                       neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])], 
                       out: Collector[T]): Unit
}

Usage Patterns

Simple Edge Processing

Process edges without considering vertex values:

import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.util.Collector

// Count outgoing edges per vertex
class EdgeCounter extends EdgesFunction[Long, Double, (Long, Int)] {
  override def iterateEdges(edges: Iterable[(Long, Edge[Long, Double])], 
                           out: Collector[(Long, Int)]): Unit = {
    val edgeList = edges.toList
    if (edgeList.nonEmpty) {
      val vertexId = edgeList.head._1
      out.collect((vertexId, edgeList.size))
    }
  }
}

// Apply the function
val edgeCounts = graph.groupReduceOnEdges(new EdgeCounter(), EdgeDirection.OUT)

Edge Processing with Vertex Context

Process edges while considering the vertex value:

// Filter edges based on vertex value threshold
class EdgeFilter extends EdgesFunctionWithVertexValue[Long, Double, Double, Edge[Long, Double]] {
  override def iterateEdges(v: Vertex[Long, Double], 
                           edges: Iterable[Edge[Long, Double]], 
                           out: Collector[Edge[Long, Double]]): Unit = {
    val threshold = v.getValue
    for (edge <- edges) {
      if (edge.getValue >= threshold * 0.5) {
        out.collect(edge)
      }
    }
  }
}

// Apply the function
val filteredEdges = graph.groupReduceOnEdges(new EdgeFilter(), EdgeDirection.OUT)

Neighbor Analysis

Process neighbors to compute local graph properties:

// Compute average neighbor degree
class NeighborDegreeAnalysis extends NeighborsFunction[Long, Double, Double, (Long, Double)] {
  override def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Double], Vertex[Long, Double])], 
                               out: Collector[(Long, Double)]): Unit = {
    val neighborList = neighbors.toList
    if (neighborList.nonEmpty) {
      val vertexId = neighborList.head._1
      val neighborValues = neighborList.map(_._3.getValue)
      val avgNeighborValue = neighborValues.sum / neighborValues.size
      
      out.collect((vertexId, avgNeighborValue))
    }
  }
}

// Apply the function
val neighborAnalysis = graph.groupReduceOnNeighbors(new NeighborDegreeAnalysis(), EdgeDirection.ALL)

Advanced Neighbor Processing with Vertex Context

Perform complex analysis considering both the central vertex and its neighbors:

// Compute local clustering-like metric
class LocalStructureAnalysis extends NeighborsFunctionWithVertexValue[Long, Double, Double, (Long, Double, Int, Double)] {
  override def iterateNeighbors(vertex: Vertex[Long, Double], 
                               neighbors: Iterable[(Edge[Long, Double], Vertex[Long, Double])], 
                               out: Collector[(Long, Double, Int, Double)]): Unit = {
    val neighborList = neighbors.toList
    val vertexValue = vertex.getValue
    val neighborCount = neighborList.size
    
    if (neighborCount > 0) {
      val avgEdgeWeight = neighborList.map(_._1.getValue).sum / neighborCount
      val result = (vertex.getId, vertexValue, neighborCount, avgEdgeWeight)
      out.collect(result)
    }
  }
}

// Apply the function
val structureAnalysis = graph.groupReduceOnNeighbors(
  new LocalStructureAnalysis(), 
  EdgeDirection.ALL
)

Advanced Usage Examples

Multi-Output Functions

Functions can emit multiple results per vertex:

// Emit statistics for each neighbor relationship
class DetailedNeighborStats extends NeighborsFunctionWithVertexValue[Long, String, Double, (Long, String, String, Double)] {
  override def iterateNeighbors(vertex: Vertex[Long, String], 
                               neighbors: Iterable[(Edge[Long, Double], Vertex[Long, String])], 
                               out: Collector[(Long, String, String, Double)]): Unit = {
    for ((edge, neighbor) <- neighbors) {
      val result = (vertex.getId, vertex.getValue, neighbor.getValue, edge.getValue)
      out.collect(result)
    }
  }
}

Conditional Processing

Implement conditional logic based on graph structure:

// Process high-degree vertices differently
class AdaptiveEdgeProcessor extends EdgesFunctionWithVertexValue[Long, Double, Double, (Long, String, Double)] {
  override def iterateEdges(v: Vertex[Long, Double], 
                           edges: Iterable[Edge[Long, Double]], 
                           out: Collector[(Long, String, Double)]): Unit = {
    val edgeList = edges.toList
    val edgeCount = edgeList.size
    
    val analysis = if (edgeCount > 10) {
      // High-degree vertex: compute average
      val avgWeight = edgeList.map(_.getValue).sum / edgeCount
      (v.getId, "high-degree", avgWeight)
    } else {
      // Low-degree vertex: compute max
      val maxWeight = if (edgeList.nonEmpty) edgeList.map(_.getValue).max else 0.0
      (v.getId, "low-degree", maxWeight)
    }
    
    out.collect(analysis)
  }
}

Stateful Processing

Maintain state across processing within a single function call:

// Track edge patterns within neighborhood
class EdgePatternAnalyzer extends NeighborsFunction[Long, Double, Double, (Long, Map[String, Int])] {
  override def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Double], Vertex[Long, Double])], 
                               out: Collector[(Long, Map[String, Int])]): Unit = {
    val neighborList = neighbors.toList
    if (neighborList.nonEmpty) {
      val vertexId = neighborList.head._1
      
      // Categorize edges by weight ranges
      val patterns = scala.collection.mutable.Map[String, Int]()
      
      for ((_, edge, _) <- neighborList) {
        val category = edge.getValue match {
          case w if w < 0.3 => "weak"
          case w if w < 0.7 => "medium"  
          case _ => "strong"
        }
        patterns(category) = patterns.getOrElse(category, 0) + 1
      }
      
      out.collect((vertexId, patterns.toMap))
    }
  }
}

Function Design Best Practices

Performance Optimization

  • Lazy Evaluation: Use Scala's lazy collections when appropriate
  • Memory Management: Avoid collecting large neighborhoods into memory unnecessarily
  • Type Specialization: Use primitive types when possible to avoid boxing overhead

Error Handling

  • Null Safety: Check for null values in vertex and edge data
  • Empty Collections: Handle cases where vertices have no edges or neighbors
  • Exception Management: Use appropriate exception handling within functions

Scala Idioms

  • Pattern Matching: Use Scala's pattern matching for elegant conditional logic
  • Collection Operations: Leverage Scala's rich collection API (map, filter, reduce, etc.)
  • Functional Style: Prefer immutable data structures and functional transformations
// Example using Scala idioms
class ScalaIdiomsExample extends NeighborsFunctionWithVertexValue[Long, String, Double, (Long, Double)] {
  override def iterateNeighbors(vertex: Vertex[Long, String], 
                               neighbors: Iterable[(Edge[Long, Double], Vertex[Long, String])], 
                               out: Collector[(Long, Double)]): Unit = {
    val result = neighbors
      .filter(_._1.getValue > 0.5)  // Filter strong edges
      .map(_._2.getValue.length.toDouble)  // Map to vertex name lengths
      .reduceOption(_ + _)  // Sum with safe reduction
      .getOrElse(0.0)  // Default value
    
    out.collect((vertex.getId, result))
  }
}

These user-defined functions provide the foundation for implementing custom graph algorithms and analytics while maintaining the performance and scalability benefits of Flink's distributed processing engine.