or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

graph-analytics.mdgraph-construction.mdgraph-operations.mdindex.mditerative-algorithms.md
tile.json

tessl/maven-org-apache-flink--flink-gelly-scala_2-10

Scala API for Apache Flink's Gelly graph processing library providing distributed graph operations and algorithms

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-gelly-scala_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-gelly-scala_2-10@1.3.0

index.mddocs/

Apache Flink Gelly Scala API

Overview

Apache Flink Gelly Scala is a graph processing library that provides a high-level Scala API for distributed graph analytics and algorithms on Apache Flink. It enables developers to build scalable graph processing applications using Scala's functional programming paradigms, with support for vertex-centric iterations, scatter-gather patterns, and a comprehensive set of graph operations.

Package Information

  • Package Name: flink-gelly-scala_2.10
  • Package Type: maven
  • Language: Scala 2.10
  • Installation: Add to Maven dependency

Installation

Add to your Maven pom.xml:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly-scala_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Core Imports

import org.apache.flink.graph.scala._
import org.apache.flink.api.scala._
import org.apache.flink.graph.{Edge, Vertex}

Basic Usage

import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}

// Set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// Create vertices and edges
val vertices = env.fromCollection(List(
  new Vertex(1L, "A"),
  new Vertex(2L, "B"),
  new Vertex(3L, "C")
))

val edges = env.fromCollection(List(
  new Edge(1L, 2L, 1.0),
  new Edge(2L, 3L, 2.0)
))

// Create graph
val graph = Graph.fromDataSet(vertices, edges, env)

// Basic operations
val vertexCount = graph.numberOfVertices()
val degrees = graph.getDegrees()

Architecture

The Flink Gelly Scala library consists of several key components:

  • Graph Class: The main graph representation with type-safe operations
  • Graph Factory Methods: Multiple ways to create graphs from various data sources
  • Graph Transformations: Methods for mapping, filtering, and transforming graphs
  • Graph Analytics: Built-in algorithms and metrics calculations
  • Iterative Processing: Support for custom graph algorithms using different iteration patterns

Core Types

// Main graph type with key (K), vertex value (VV), and edge value (EV) type parameters
final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]

// Core graph elements from Flink Gelly Java API
class Vertex[K, VV](id: K, value: VV)
class Edge[K, EV](source: K, target: K, value: EV)
class Triplet[K, VV, EV](srcVertexId: K, trgVertexId: K, srcVertexValue: VV, trgVertexValue: VV, edgeValue: EV)

Graph Construction

Create graphs from various data sources including DataSets, collections, tuples, and CSV files.

Basic Construction

// From DataSets
def fromDataSet[K, VV, EV](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV]
def fromDataSet[K, EV](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV]
def fromDataSet[K, VV, EV](edges: DataSet[Edge[K, EV]], vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV]

// From Collections
def fromCollection[K, VV, EV](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV]
def fromCollection[K, EV](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV]

Tuple-based Construction

// From Tuples - convenient for creating graphs from structured data
def fromTupleDataSet[K, VV, EV](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, VV, EV]
def fromTupleDataSet[K, EV](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV]
def fromTuple2DataSet[K](edges: DataSet[(K, K)], env: ExecutionEnvironment): Graph[K, NullValue, NullValue]

// From CSV Files - extensive configuration for file-based graph creation
def fromCsvReader[K, VV, EV](env: ExecutionEnvironment, pathEdges: String, ...): Graph[K, VV, EV]

Complete Graph Construction Documentation

Graph Operations

Access graph data and perform basic transformations.

Data Access

// Access graph components
def getVertices: DataSet[Vertex[K, VV]]
def getEdges: DataSet[Edge[K, EV]]
def getVerticesAsTuple2(): DataSet[(K, VV)]
def getEdgesAsTuple3(): DataSet[(K, K, EV)]
def getTriplets(): DataSet[Triplet[K, VV, EV]]

// Graph metrics
def numberOfVertices(): Long
def numberOfEdges(): Long
def getVertexIds(): DataSet[K]
def getEdgeIds(): DataSet[(K, K)]

Transformations

// Map transformations
def mapVertices[NV: TypeInformation : ClassTag](mapper: MapFunction[Vertex[K, VV], NV]): Graph[K, NV, EV]
def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] => NV): Graph[K, NV, EV]
def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K, EV], NV]): Graph[K, VV, NV]
def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV): Graph[K, VV, NV]

// Translation operations
def translateGraphIds[NEW: TypeInformation : ClassTag](translator: TranslateFunction[K, NEW]): Graph[NEW, VV, EV]
def translateVertexValues[NEW: TypeInformation : ClassTag](translator: TranslateFunction[VV, NEW]): Graph[K, NEW, EV]
def translateEdgeValues[NEW: TypeInformation : ClassTag](translator: TranslateFunction[EV, NEW]): Graph[K, VV, NEW]

Complete Graph Operations Documentation

Graph Analytics

Perform analytics and computations on graph structure.

Degree Calculations

// Degree metrics
def inDegrees(): DataSet[(K, LongValue)]
def outDegrees(): DataSet[(K, LongValue)]
def getDegrees(): DataSet[(K, LongValue)]

Graph Structure Operations

// Structural transformations
def getUndirected(): Graph[K, VV, EV]
def reverse(): Graph[K, VV, EV]
def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]
def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]): Graph[K, VV, EV]
def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]

Set Operations

// Graph set operations
def union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
def difference(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV]

// Graph validation
def validate(validator: GraphValidator[K, VV, EV]): Boolean

Complete Graph Analytics Documentation

Iterative Algorithms

Support for implementing custom graph algorithms using different iteration patterns.

Iteration Types

// Scatter-Gather iterations
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV], gatherFunction: GatherFunction[K, VV, M], maxIterations: Int): Graph[K, VV, EV]

// Gather-Sum-Apply iterations  
def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction: SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int): Graph[K, VV, EV]

// Vertex-centric iterations (Pregel-style)
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M], combineFunction: MessageCombiner[K, M], maxIterations: Int): Graph[K, VV, EV]

// Algorithm framework integration
def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]

Complete Iterative Algorithms Documentation

Built-in Graph Algorithms

Access to the comprehensive Gelly algorithm library through the Scala API.

Pre-implemented Algorithms

// Algorithm execution
def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]

Available Algorithms:

  • PageRank - Computes the PageRank scores for graph vertices
  • Connected Components - Labels vertices by connected component membership
  • Single Source Shortest Paths - Computes shortest paths from a source vertex
  • Triangle Enumeration - Enumerates all triangles in the graph
  • HITS Algorithm - Computes hub and authority scores
  • Clustering Coefficients - Measures clustering for vertices
  • Community Detection - Identifies communities using label propagation
  • Graph Metrics - Various centrality and structural measures

Usage Examples:

import org.apache.flink.graph.library.{PageRank, ConnectedComponents, SingleSourceShortestPaths}

// PageRank with damping factor and maximum iterations
val pageRankResult = graph.run(new PageRank[Long](dampingFactor = 0.85, maxIterations = 10))

// Connected Components - finds connected subgraphs
val components = graph.run(new ConnectedComponents[Long, String, Double](maxIterations = 10))

// Single Source Shortest Paths from vertex with ID 1L
val shortestPaths = graph.run(new SingleSourceShortestPaths[Long, Double](srcVertexId = 1L, maxIterations = 10))

Neighborhood Operations

Operations for working with vertex neighborhoods and performing aggregations.

// Edge-based aggregations
def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T], direction: EdgeDirection): DataSet[T]
def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

// Neighbor-based aggregations
def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunction[K, VV, EV, T], direction: EdgeDirection): DataSet[T]
def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

// Reduction operations
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction: EdgeDirection): DataSet[(K, VV)]
def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): DataSet[(K, EV)]

Graph Mutations

Methods for adding and removing vertices and edges.

// Adding elements
def addVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV]
def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV]
def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV): Graph[K, VV, EV]

// Removing elements
def removeVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV]
def removeEdge(edge: Edge[K, EV]): Graph[K, VV, EV]
def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV]

Join Operations

Join graph data with external datasets to enrich vertex and edge information.

// Vertex joins
def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV]
def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV): Graph[K, VV, EV]

// Edge joins
def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV): Graph[K, VV, EV]
def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]