or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

dataset-operations.mdexecution-environment.mdextensions.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md
tile.json

tessl/maven-org-apache-flink--flink-scala_2-12

Apache Flink Scala API providing type-safe operations and functional programming paradigms for distributed stream and batch processing applications.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-scala_2.12@1.20.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-scala_2-12@1.20.0

index.mddocs/

Flink Scala API

Apache Flink Scala API provides type-safe operations and functional programming paradigms for distributed stream and batch processing applications. It offers elegant Scala APIs with case class support, pattern matching, and functional composition patterns for building scalable data processing pipelines.

⚠️ Deprecation Notice: All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.

Package Information

  • Package Name: flink-scala_2.12
  • Package Type: maven
  • Language: Scala
  • Installation: Add to Maven pom.xml:
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-scala_2.12</artifactId>
  <version>1.20.2</version>
</dependency>

Core Imports

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.ExecutionEnvironment

For specific functionality:

import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.api.scala.extensions._  // For partial function support

Basic Usage

import org.apache.flink.api.scala._

// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// Create dataset from elements
val data: DataSet[String] = env.fromElements("Hello", "World", "from", "Flink")

// Transform data
val wordCounts = data
  .flatMap(_.toLowerCase.split("\\W+"))
  .filter(_.nonEmpty)
  .map((_, 1))
  .groupBy(0)
  .sum(1)

// Output results
wordCounts.print()

// Execute the program
env.execute("Word Count Example")

// Example with partial functions (requires extensions import)
import org.apache.flink.api.scala.extensions._

case class Sale(region: String, product: String, amount: Double)
val sales = env.fromElements(
  Sale("US", "ProductA", 100.0),
  Sale("EU", "ProductA", 150.0)
)

val result = sales
  .filterWith { case Sale(region, _, _) => region == "US" }
  .mapWith { case Sale(region, product, amount) => (product, amount) }
  .groupingBy(_._1)
  .sum(1)

Architecture

The Flink Scala API is built around these core concepts:

  • ExecutionEnvironment: Context for creating and executing Flink programs
  • DataSet: Immutable collection representing distributed data
  • Transformations: Operations like map, filter, join that create new DataSets
  • Actions: Operations like collect, print that trigger execution
  • Type System: Automatic TypeInformation generation for Scala types

Capabilities

Execution Environment

The entry point for all Flink Scala programs, providing methods to create DataSets and configure execution.

object ExecutionEnvironment {
  def getExecutionEnvironment: ExecutionEnvironment
  def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
}

class ExecutionEnvironment {
  def setParallelism(parallelism: Int): Unit
  def getParallelism: Int
  def getConfig: ExecutionConfig
  
  // Data source creation
  def fromElements[T: ClassTag: TypeInformation](data: T*): DataSet[T]
  def fromCollection[T: ClassTag: TypeInformation](data: Iterable[T]): DataSet[T]
  def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
  def readCsvFile[T: ClassTag: TypeInformation](filePath: String, /* ... */): DataSet[T]
  def generateSequence(from: Long, to: Long): DataSet[Long]
  
  // Execution
  def execute(): JobExecutionResult
  def execute(jobName: String): JobExecutionResult
  def executeAsync(): JobClient
}

Execution Environment

DataSet Operations

Core data transformation and processing operations on distributed datasets.

class DataSet[T] {
  // Basic transformations
  def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
  def filter(fun: T => Boolean): DataSet[T]
  def distinct(): DataSet[T]
  
  // Grouping
  def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
  def groupBy(fields: Int*): GroupedDataSet[T]
  
  // Aggregations
  def reduce(fun: (T, T) => T): DataSet[T]
  def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
  def sum(field: Int): AggregateDataSet[T]
  def max(field: Int): AggregateDataSet[T]
  def min(field: Int): AggregateDataSet[T]
  
  // Output operations
  def collect(): Seq[T]
  def print(): Unit
  def count(): Long
  def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE): DataSink[T]
}

DataSet Operations

Join Operations

Joining datasets on keys with various join types and optimization hints.

class DataSet[T] {
  def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
  def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
  def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
  def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
}

class UnfinishedJoinOperation[L, R] {
  def where[K: TypeInformation](fun: L => K): HalfUnfinishedJoinOperation[L, R]
  def where(fields: Int*): HalfUnfinishedJoinOperation[L, R]
}

class JoinDataSet[L, R] {
  def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
}

Join Operations

Grouped DataSet Operations

Operations available on grouped datasets including sorting and specialized aggregations.

class GroupedDataSet[T] {
  def sortGroup(field: Int, order: Order): GroupedDataSet[T]
  def reduce(fun: (T, T) => T): DataSet[T]
  def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
  def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
  def sum(field: Int): AggregateDataSet[T]
  def max(field: Int): AggregateDataSet[T]
  def min(field: Int): AggregateDataSet[T]
  def maxBy(fields: Int*): DataSet[T]
  def minBy(fields: Int*): DataSet[T]
  def first(n: Int): DataSet[T]
}

Grouped DataSet Operations

Type System

Scala-specific type information system for serialization and type safety.

object Types {
  // Basic types
  val STRING: TypeInformation[String]
  val INT: TypeInformation[Int]
  val LONG: TypeInformation[Long]
  val DOUBLE: TypeInformation[Double]
  val BOOLEAN: TypeInformation[Boolean]
  
  // Factory methods
  def of[T: TypeInformation]: TypeInformation[T]
  def TUPLE[T: TypeInformation]: TypeInformation[T]
  def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
  def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]
  def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
}

// Implicit type information generation (macro-based)
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]

Type System

Partitioned and Sorted DataSets

Specialized DataSet types for partitioned and sorted data operations.

class PartitionSortedDataSet[T] extends DataSet[T] {
  def sortPartition(field: Int, order: Order): DataSet[T]
  def sortPartition(field: String, order: Order): DataSet[T]
  // Note: Cannot chain key selector functions
}

Extension Methods for Partial Functions

Scala-friendly extension methods that accept partial functions for pattern matching.

// Import extensions
import org.apache.flink.api.scala.extensions._

class OnDataSet[T] {
  def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
  def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
  def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
  def filterWith(fun: T => Boolean): DataSet[T]
  def reduceWith(fun: (T, T) => T): DataSet[T]
  def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
  def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
}

Utility Functions

Additional utilities for DataSet operations including sampling, partitioning, and indexing.

// Import utilities
import org.apache.flink.api.scala.utils._

class DataSet[T] {
  // Available via implicit conversions from utils package
  def countElementsPerPartition(): DataSet[(Int, Long)]
  def zipWithIndex(): DataSet[(Long, T)]
  def zipWithUniqueId(): DataSet[(Long, T)]
  def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.RNG.nextLong()): DataSet[T]
  def sampleWithSize(withReplacement: Boolean, numSamples: Int, seed: Long = Utils.RNG.nextLong()): DataSet[T]
  def checksumHashCode(): ChecksumHashCode
}

Utility Functions

Common Types

// Execution configuration
class ExecutionConfig {
  def setParallelism(parallelism: Int): ExecutionConfig
  def getParallelism: Int
  def enableClosureCleaner(): ExecutionConfig
  def disableClosureCleaner(): ExecutionConfig
}

// Job execution result
class JobExecutionResult {
  def getJobExecutionTime: Long
  def getAccumulatorResult[T](accumulatorName: String): T
}

// Aggregation types
object Aggregations extends Enumeration {
  val SUM, MAX, MIN = Value
}

// File system write modes
object FileSystem {
  object WriteMode extends Enumeration {
    val NO_OVERWRITE, OVERWRITE = Value
  }
}

// Ordering for sorting
object Order extends Enumeration {
  val ASCENDING, DESCENDING = Value
}

Error Handling

The Flink Scala API can throw these exceptions:

  • IllegalArgumentException - Invalid parameters or field names
  • UnsupportedOperationException - Unsupported operations on certain data types
  • RuntimeException - Runtime execution errors
  • IOException - File I/O related errors
  • JobExecutionException - Job execution failures