or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md
tile.json

tessl/maven-org-apache-flink--flink-scala_2-11

Apache Flink Scala API module that provides elegant and fluent Scala language bindings for Flink's distributed stream and batch processing framework

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-scala_2.11@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-scala_2-11@1.14.0

index.mddocs/

Apache Flink Scala API

Apache Flink Scala API provides elegant and fluent Scala language bindings for Flink's distributed stream and batch processing framework. This module enables Scala developers to write type-safe data processing applications using idiomatic Scala constructs, including case classes, pattern matching, and functional programming patterns.

Package Information

  • Package Name: flink-scala_2.11
  • Package Type: maven
  • Language: Scala
  • Installation: Add to your Maven pom.xml:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>1.14.6</version>
    </dependency>

Core Imports

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment

For type utilities:

import org.apache.flink.api.scala.typeutils.Types

For extension methods (partial function support):

import org.apache.flink.api.scala.extensions._

Basic Usage

import org.apache.flink.api.scala._

// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// Create DataSet from collection
val data = env.fromCollection(List(1, 2, 3, 4, 5))

// Transform data
val result = data
  .filter(_ > 2)
  .map(_ * 2)
  .reduce(_ + _)

// Execute and get result
println(result.collect().head) // Prints: 18

Architecture

The Flink Scala API is built around several key components:

  • ExecutionEnvironment: Entry point for creating and configuring Flink programs
  • DataSet[T]: Core abstraction representing distributed collections with type safety
  • Type System: Automatic type information generation via Scala macros
  • Serialization Framework: Specialized serializers for Scala types (Option, Either, Try, case classes)
  • Fluent API: Method chaining for building complex data processing pipelines
  • Extensions: Partial function support for pattern matching in transformations

Capabilities

Execution Environment

Environment setup, data source creation, and job execution management. The primary entry point for all Flink programs.

object ExecutionEnvironment {
  def getExecutionEnvironment: ExecutionEnvironment
  def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
}

class ExecutionEnvironment {
  def setParallelism(parallelism: Int): Unit
  def getParallelism: Int
  def fromCollection[T: TypeInformation: ClassTag](data: Iterable[T]): DataSet[T]
  def fromElements[T: TypeInformation: ClassTag](data: T*): DataSet[T]
  def readTextFile(filePath: String): DataSet[String]
  def execute(): JobExecutionResult
  def execute(jobName: String): JobExecutionResult
}

Execution Environment

Data Transformations

Core data processing operations including map, filter, reduce, and aggregations. The heart of Flink's data processing capabilities.

class DataSet[T] {
  def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
  def filter(fun: T => Boolean): DataSet[T]
  def reduce(fun: (T, T) => T): DataSet[T]
  def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
  def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
  def union(other: DataSet[T]): DataSet[T]
}

Data Transformations

Grouping and Aggregation

Group-wise operations and aggregation functions for summarizing and analyzing grouped data.

class GroupedDataSet[T] {
  def reduce(fun: (T, T) => T): DataSet[T]
  def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
  def sum(field: String): AggregateDataSet[T]
  def max(field: String): AggregateDataSet[T]
  def min(field: String): AggregateDataSet[T]
  def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
}

Grouping and Aggregation

Binary Operations

Join, cross, and coGroup operations for combining multiple DataSets.

class DataSet[T] {
  def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
  def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
  def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]  
  def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
  def cross[O](other: DataSet[O]): CrossDataSet[T, O]
  def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
}

Binary Operations

Type System and Serialization

Comprehensive type information system and Scala-specific serialization support.

object Types {
  def of[T: TypeInformation]: TypeInformation[T]
  def OPTION[A](valueType: TypeInformation[A]): TypeInformation[Option[A]]
  def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
  def TRY[A](valueType: TypeInformation[A]): TypeInformation[Try[A]]
  def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
}

// Automatic type information generation via macro
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]

Type System

Partitioning and Distribution

Control over data distribution and partitioning strategies across the cluster.

class DataSet[T] {
  def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
  def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
  def rebalance(): DataSet[T]
  def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
}

Partitioning and Distribution

Input and Output Operations

Reading data from various sources and writing results to different sinks.

class ExecutionEnvironment {
  def readTextFile(filePath: String): DataSet[String]
  def readCsvFile[T: ClassTag: TypeInformation](filePath: String): DataSet[T]
  def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
}

class DataSet[T] {
  def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = null): DataSink[T]
  def writeAsCsv(filePath: String): DataSink[T]
  def print(): Unit
  def collect(): Seq[T]  
}

Input and Output

Utility Functions

Advanced utilities for sampling, indexing, and data analysis.

implicit class DataSetUtils[T](dataSet: DataSet[T]) {
  def zipWithIndex: DataSet[(Long, T)]
  def zipWithUniqueId: DataSet[(Long, T)]
  def sample(withReplacement: Boolean, fraction: Double): DataSet[T]
  def countElementsPerPartition: DataSet[(Int, Long)]
}

Utilities

Types

Core Types

trait TypeInformation[T] {
  def getTypeClass: Class[T]
  def createSerializer(config: ExecutionConfig): TypeSerializer[T]
}

class JobExecutionResult {
  def getJobID: JobID
  def getNetRuntime: Long
  def getNetRuntime(timeUnit: TimeUnit): Long
}

sealed trait Order
object Order {
  case object ASCENDING extends Order
  case object DESCENDING extends Order
}

abstract class Partitioner[T] {
  def partition(key: T, numPartitions: Int): Int
}