or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-integration.mdgraph-algorithms.mdgraph-analytics.mdgraph-creation.mdgraph-transformations.mdindex.mduser-defined-functions.md
tile.json

graph-transformations.mddocs/

Graph Transformations

Type-safe operations for transforming vertex and edge values, filtering graph elements, and modifying graph structure while preserving the distributed processing benefits of Flink.

Capabilities

Vertex Transformations

Apply functions to vertex values while preserving graph topology.

/**
 * Apply a function to the attribute of each vertex in the graph.
 * @param mapper the map function to apply.
 * @return a new graph
 */
def mapVertices[NV](mapper: MapFunction[Vertex[K, VV], NV]): Graph[K, NV, EV]

/**
 * Apply a function to the attribute of each vertex in the graph.
 * @param fun the map function to apply.
 * @return a new graph
 */
def mapVertices[NV](fun: Vertex[K, VV] => NV): Graph[K, NV, EV]

Edge Transformations

Apply functions to edge values while preserving graph connectivity.

/**
 * Apply a function to the attribute of each edge in the graph.
 * @param mapper the map function to apply.
 * @return a new graph
 */
def mapEdges[NV](mapper: MapFunction[Edge[K, EV], NV]): Graph[K, VV, NV]

/**
 * Apply a function to the attribute of each edge in the graph.
 * @param fun the map function to apply.
 * @return a new graph
 */
def mapEdges[NV](fun: Edge[K, EV] => NV): Graph[K, VV, NV]

ID and Value Translation

Transform vertex and edge identifiers or values using translation functions.

/**
 * Translate vertex and edge IDs using the given TranslateFunction.
 * @param translator implements conversion from K to NEW
 * @return graph with translated vertex and edge IDs
 */
def translateGraphIds[NEW](translator: TranslateFunction[K, NEW]): Graph[NEW, VV, EV]

/**
 * Translate vertex and edge IDs using the given function.
 * @param fun implements conversion from K to NEW
 * @return graph with translated vertex and edge IDs
 */
def translateGraphIds[NEW](fun: (K, NEW) => NEW): Graph[NEW, VV, EV]

/**
 * Translate vertex values using the given TranslateFunction.
 * @param translator implements conversion from VV to NEW
 * @return graph with translated vertex values
 */
def translateVertexValues[NEW](translator: TranslateFunction[VV, NEW]): Graph[K, NEW, EV]

/**
 * Translate vertex values using the given function.
 * @param fun implements conversion from VV to NEW
 * @return graph with translated vertex values
 */
def translateVertexValues[NEW](fun: (VV, NEW) => NEW): Graph[K, NEW, EV]

/**
 * Translate edge values using the given TranslateFunction.
 * @param translator implements conversion from EV to NEW
 * @return graph with translated edge values
 */
def translateEdgeValues[NEW](translator: TranslateFunction[EV, NEW]): Graph[K, VV, NEW]

/**
 * Translate edge values using the given function.
 * @param fun implements conversion from EV to NEW
 * @return graph with translated edge values
 */
def translateEdgeValues[NEW](fun: (EV, NEW) => NEW): Graph[K, VV, NEW]

Graph Filtering

Create subgraphs by applying filter predicates to vertices and edges.

/**
 * Apply filtering functions to the graph and return a sub-graph that
 * satisfies the predicates for both vertices and edges.
 * @param vertexFilter the filter function for vertices.
 * @param edgeFilter the filter function for edges.
 * @return the resulting sub-graph.
 */
def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], 
             edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]

/**
 * Apply filtering functions to the graph and return a sub-graph that
 * satisfies the predicates for both vertices and edges.
 * @param vertexFilterFun the filter function for vertices.
 * @param edgeFilterFun the filter function for edges.
 * @return the resulting sub-graph.
 */
def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean, 
             edgeFilterFun: Edge[K, EV] => Boolean): Graph[K, VV, EV]

/**
 * Apply a filtering function to the graph and return a sub-graph that
 * satisfies the predicates only for the vertices.
 * @param vertexFilter the filter function for vertices.
 * @return the resulting sub-graph.
 */
def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]): Graph[K, VV, EV]

/**
 * Apply a filtering function to the graph and return a sub-graph that
 * satisfies the predicates only for the vertices.
 * @param vertexFilterFun the filter function for vertices.
 * @return the resulting sub-graph.
 */
def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean): Graph[K, VV, EV]

/**
 * Apply a filtering function to the graph and return a sub-graph that
 * satisfies the predicates only for the edges.
 * @param edgeFilter the filter function for edges.
 * @return the resulting sub-graph.
 */
def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]

/**
 * Apply a filtering function to the graph and return a sub-graph that
 * satisfies the predicates only for the edges.
 * @param edgeFilterFun the filter function for edges.
 * @return the resulting sub-graph.
 */
def filterOnEdges(edgeFilterFun: Edge[K, EV] => Boolean): Graph[K, VV, EV]

Graph Structure Modifications

Transform the structural properties of the graph.

/**
 * This operation adds all inverse-direction edges to the graph.
 * @return the undirected graph.
 */
def getUndirected(): Graph[K, VV, EV]

/**
 * Reverse the direction of the edges in the graph
 * @return a new graph with all edges reversed
 */
def reverse(): Graph[K, VV, EV]

Usage Examples:

import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.api.scala._

val env = ExecutionEnvironment.getExecutionEnvironment

// Create sample graph
val vertices = env.fromCollection(Seq(
  new Vertex(1L, "Alice"),
  new Vertex(2L, "Bob"),
  new Vertex(3L, "Charlie_longname")
))

val edges = env.fromCollection(Seq(
  new Edge(1L, 2L, 0.5),
  new Edge(2L, 3L, 0.3),
  new Edge(1L, 3L, 0.8)
))

val graph = Graph.fromDataSet(vertices, edges, env)

// Vertex transformations
val upperCaseGraph = graph.mapVertices(v => v.getValue.toUpperCase)
val lengthGraph = graph.mapVertices(_.getValue.length)

// Edge transformations  
val doubledWeights = graph.mapEdges(e => e.getValue * 2.0)
val binaryEdges = graph.mapEdges(e => if (e.getValue > 0.5) 1 else 0)

// Filtering operations
val longNameVertices = graph.filterOnVertices(_.getValue.length > 5)
val strongEdges = graph.filterOnEdges(_.getValue > 0.4)
val filteredSubgraph = graph.subgraph(
  vertexFilterFun = _.getValue.length <= 10,
  edgeFilterFun = _.getValue >= 0.3
)

// Structure modifications
val undirectedGraph = graph.getUndirected()
val reversedGraph = graph.reverse()

// ID translation (e.g., Long to String)
val stringIdGraph = graph.translateGraphIds[String]((longId, reuse) => longId.toString)

// Value translation
val intValueGraph = graph.translateVertexValues[Int]((stringValue, reuse) => stringValue.length)

Advanced Filtering Patterns

More sophisticated filtering examples for complex graph analysis scenarios.

Usage Examples:

// Multi-criteria vertex filtering
val complexVertexFilter = graph.filterOnVertices { vertex =>
  val name = vertex.getValue
  name.length > 3 && name.startsWith("A")
}

// Edge filtering based on vertex relationships
val edgeFilterWithThreshold = graph.filterOnEdges { edge =>
  edge.getValue > 0.5 && edge.getSource != edge.getTarget
}

// Combined subgraph filtering
val analyticsSubgraph = graph.subgraph(
  vertexFilterFun = vertex => {
    val name = vertex.getValue
    name.contains("a") || name.contains("A")
  },
  edgeFilterFun = edge => {
    edge.getValue >= 0.4
  }
)

// Chaining transformations
val processedGraph = graph
  .mapVertices(_.getValue.toLowerCase.trim)
  .filterOnVertices(_.getValue.nonEmpty)
  .mapEdges(e => math.round(e.getValue * 100) / 100.0)
  .filterOnEdges(_.getValue > 0.0)

Type Safety and Performance

The transformation operations maintain full type safety throughout the pipeline:

  • Type preservation: Operations maintain the type relationships between K, VV, and EV
  • Lazy evaluation: Transformations are lazily evaluated within Flink's execution model
  • Distributed processing: All operations are automatically distributed across the Flink cluster
  • Functional composition: Operations can be chained together for complex processing pipelines

All transformation methods return new Graph instances, preserving immutability principles while enabling efficient distributed processing.