CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-gelly-scala-2-10

Scala API for Apache Flink's Gelly graph processing library providing distributed graph operations and algorithms

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

Apache Flink Gelly Scala API

Overview

Apache Flink Gelly Scala is a graph processing library that provides a high-level Scala API for distributed graph analytics and algorithms on Apache Flink. It enables developers to build scalable graph processing applications using Scala's functional programming paradigms, with support for vertex-centric iterations, scatter-gather patterns, and a comprehensive set of graph operations.

Package Information

  • Package Name: flink-gelly-scala_2.10
  • Package Type: maven
  • Language: Scala 2.10
  • Installation: Add to Maven dependency

Installation

Add to your Maven pom.xml:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-gelly-scala_2.10</artifactId>
    <version>1.3.3</version>
</dependency>

Core Imports

import org.apache.flink.graph.scala._
import org.apache.flink.api.scala._
import org.apache.flink.graph.{Edge, Vertex}

Basic Usage

import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.graph.{Edge, Vertex}

// Set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// Create vertices and edges
val vertices = env.fromCollection(List(
  new Vertex(1L, "A"),
  new Vertex(2L, "B"),
  new Vertex(3L, "C")
))

val edges = env.fromCollection(List(
  new Edge(1L, 2L, 1.0),
  new Edge(2L, 3L, 2.0)
))

// Create graph
val graph = Graph.fromDataSet(vertices, edges, env)

// Basic operations
val vertexCount = graph.numberOfVertices()
val degrees = graph.getDegrees()

Architecture

The Flink Gelly Scala library consists of several key components:

  • Graph Class: The main graph representation with type-safe operations
  • Graph Factory Methods: Multiple ways to create graphs from various data sources
  • Graph Transformations: Methods for mapping, filtering, and transforming graphs
  • Graph Analytics: Built-in algorithms and metrics calculations
  • Iterative Processing: Support for custom graph algorithms using different iteration patterns

Core Types

// Main graph type with key (K), vertex value (VV), and edge value (EV) type parameters
final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]

// Core graph elements from Flink Gelly Java API
class Vertex[K, VV](id: K, value: VV)
class Edge[K, EV](source: K, target: K, value: EV)
class Triplet[K, VV, EV](srcVertexId: K, trgVertexId: K, srcVertexValue: VV, trgVertexValue: VV, edgeValue: EV)

Graph Construction

Create graphs from various data sources including DataSets, collections, tuples, and CSV files.

Basic Construction

// From DataSets
def fromDataSet[K, VV, EV](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV]
def fromDataSet[K, EV](edges: DataSet[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV]
def fromDataSet[K, VV, EV](edges: DataSet[Edge[K, EV]], vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV]

// From Collections
def fromCollection[K, VV, EV](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, VV, EV]
def fromCollection[K, EV](edges: Seq[Edge[K, EV]], env: ExecutionEnvironment): Graph[K, NullValue, EV]

Tuple-based Construction

// From Tuples - convenient for creating graphs from structured data
def fromTupleDataSet[K, VV, EV](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, VV, EV]
def fromTupleDataSet[K, EV](edges: DataSet[(K, K, EV)], env: ExecutionEnvironment): Graph[K, NullValue, EV]
def fromTuple2DataSet[K](edges: DataSet[(K, K)], env: ExecutionEnvironment): Graph[K, NullValue, NullValue]

// From CSV Files - extensive configuration for file-based graph creation
def fromCsvReader[K, VV, EV](env: ExecutionEnvironment, pathEdges: String, ...): Graph[K, VV, EV]

Complete Graph Construction Documentation

Graph Operations

Access graph data and perform basic transformations.

Data Access

// Access graph components
def getVertices: DataSet[Vertex[K, VV]]
def getEdges: DataSet[Edge[K, EV]]
def getVerticesAsTuple2(): DataSet[(K, VV)]
def getEdgesAsTuple3(): DataSet[(K, K, EV)]
def getTriplets(): DataSet[Triplet[K, VV, EV]]

// Graph metrics
def numberOfVertices(): Long
def numberOfEdges(): Long
def getVertexIds(): DataSet[K]
def getEdgeIds(): DataSet[(K, K)]

Transformations

// Map transformations
def mapVertices[NV: TypeInformation : ClassTag](mapper: MapFunction[Vertex[K, VV], NV]): Graph[K, NV, EV]
def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] => NV): Graph[K, NV, EV]
def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K, EV], NV]): Graph[K, VV, NV]
def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV): Graph[K, VV, NV]

// Translation operations
def translateGraphIds[NEW: TypeInformation : ClassTag](translator: TranslateFunction[K, NEW]): Graph[NEW, VV, EV]
def translateVertexValues[NEW: TypeInformation : ClassTag](translator: TranslateFunction[VV, NEW]): Graph[K, NEW, EV]
def translateEdgeValues[NEW: TypeInformation : ClassTag](translator: TranslateFunction[EV, NEW]): Graph[K, VV, NEW]

Complete Graph Operations Documentation

Graph Analytics

Perform analytics and computations on graph structure.

Degree Calculations

// Degree metrics
def inDegrees(): DataSet[(K, LongValue)]
def outDegrees(): DataSet[(K, LongValue)]
def getDegrees(): DataSet[(K, LongValue)]

Graph Structure Operations

// Structural transformations
def getUndirected(): Graph[K, VV, EV]
def reverse(): Graph[K, VV, EV]
def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]
def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]): Graph[K, VV, EV]
def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]): Graph[K, VV, EV]

Set Operations

// Graph set operations
def union(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
def difference(graph: Graph[K, VV, EV]): Graph[K, VV, EV]
def intersect(graph: Graph[K, VV, EV], distinctEdges: Boolean): Graph[K, NullValue, EV]

// Graph validation
def validate(validator: GraphValidator[K, VV, EV]): Boolean

Complete Graph Analytics Documentation

Iterative Algorithms

Support for implementing custom graph algorithms using different iteration patterns.

Iteration Types

// Scatter-Gather iterations
def runScatterGatherIteration[M](scatterFunction: ScatterFunction[K, VV, M, EV], gatherFunction: GatherFunction[K, VV, M], maxIterations: Int): Graph[K, VV, EV]

// Gather-Sum-Apply iterations  
def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction: SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int): Graph[K, VV, EV]

// Vertex-centric iterations (Pregel-style)
def runVertexCentricIteration[M](computeFunction: ComputeFunction[K, VV, EV, M], combineFunction: MessageCombiner[K, M], maxIterations: Int): Graph[K, VV, EV]

// Algorithm framework integration
def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]

Complete Iterative Algorithms Documentation

Built-in Graph Algorithms

Access to the comprehensive Gelly algorithm library through the Scala API.

Pre-implemented Algorithms

// Algorithm execution
def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]): T
def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T]): GraphAnalytic[K, VV, EV, T]

Available Algorithms:

  • PageRank - Computes the PageRank scores for graph vertices
  • Connected Components - Labels vertices by connected component membership
  • Single Source Shortest Paths - Computes shortest paths from a source vertex
  • Triangle Enumeration - Enumerates all triangles in the graph
  • HITS Algorithm - Computes hub and authority scores
  • Clustering Coefficients - Measures clustering for vertices
  • Community Detection - Identifies communities using label propagation
  • Graph Metrics - Various centrality and structural measures

Usage Examples:

import org.apache.flink.graph.library.{PageRank, ConnectedComponents, SingleSourceShortestPaths}

// PageRank with damping factor and maximum iterations
val pageRankResult = graph.run(new PageRank[Long](dampingFactor = 0.85, maxIterations = 10))

// Connected Components - finds connected subgraphs
val components = graph.run(new ConnectedComponents[Long, String, Double](maxIterations = 10))

// Single Source Shortest Paths from vertex with ID 1L
val shortestPaths = graph.run(new SingleSourceShortestPaths[Long, Double](srcVertexId = 1L, maxIterations = 10))

Neighborhood Operations

Operations for working with vertex neighborhoods and performing aggregations.

// Edge-based aggregations
def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T], direction: EdgeDirection): DataSet[T]
def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

// Neighbor-based aggregations
def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunction[K, VV, EV, T], direction: EdgeDirection): DataSet[T]
def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction: NeighborsFunctionWithVertexValue[K, VV, EV, T], direction: EdgeDirection): DataSet[T]

// Reduction operations
def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction: EdgeDirection): DataSet[(K, VV)]
def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection): DataSet[(K, EV)]

Graph Mutations

Methods for adding and removing vertices and edges.

// Adding elements
def addVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV]
def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV]
def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV): Graph[K, VV, EV]

// Removing elements
def removeVertex(vertex: Vertex[K, VV]): Graph[K, VV, EV]
def removeVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV]
def removeEdge(edge: Edge[K, EV]): Graph[K, VV, EV]
def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV]

Join Operations

Join graph data with external datasets to enrich vertex and edge information.

// Vertex joins
def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV]
def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV): Graph[K, VV, EV]

// Edge joins
def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV): Graph[K, VV, EV]
def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV]
Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-gelly-scala_2.10@1.3.x
Publish Source
CLI
Badge
tessl/maven-org-apache-flink--flink-gelly-scala-2-10 badge