or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md
tile.json

tessl/maven-org-apache-flink--flink-scala_2-10

Apache Flink Scala API providing type-safe distributed stream and batch processing with idiomatic Scala constructs, functional programming features, and seamless runtime integration.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-scala_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-scala_2-10@1.3.0

index.mddocs/

Apache Flink Scala API

Apache Flink Scala API provides type-safe distributed stream and batch processing with idiomatic Scala constructs, functional programming features, and seamless runtime integration. It enables Scala developers to write data processing applications using Flink's powerful streaming and batch processing capabilities with native Scala types, case classes, pattern matching, and functional transformations.

Package Information

  • Package Name: org.apache.flink:flink-scala_2.10
  • Package Type: maven
  • Language: Scala (2.10)
  • Version: 1.3.3
  • Installation: Add to your pom.xml:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.10</artifactId>
      <version>1.3.3</version>
    </dependency>

Core Imports

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.{ExecutionEnvironment, DataSet}

Basic Usage

import org.apache.flink.api.scala._

// Create execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// Create a DataSet from a collection
val data = env.fromElements(1, 2, 3, 4, 5)

// Transform the data
val result = data
  .map(_ * 2)
  .filter(_ > 4)
  
// Execute and collect results
result.print()

Architecture

The Flink Scala API is built around several core abstractions:

  • ExecutionEnvironment: Entry point for creating and executing Flink batch programs
  • DataSet[T]: Immutable distributed collection with type-safe transformations
  • GroupedDataSet[T]: DataSet grouped by key for aggregation operations
  • Type System: Comprehensive TypeInformation system for Scala types including case classes, Option, Either, Try

Capabilities

Execution Environment

The ExecutionEnvironment is the main entry point for Flink batch programs, providing data source creation and execution control.

object ExecutionEnvironment {
  def getExecutionEnvironment: ExecutionEnvironment
  def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
}

class ExecutionEnvironment {
  def setParallelism(parallelism: Int): Unit
  def getParallelism: Int
  def execute(): JobExecutionResult
  def execute(jobName: String): JobExecutionResult
}

Execution Environment

Data Sources and Sinks

Create DataSets from various sources and write results to different output formats.

class ExecutionEnvironment {
  def fromCollection[T: ClassTag : TypeInformation](data: Iterable[T]): DataSet[T]
  def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T]
  def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
  def generateSequence(from: Long, to: Long): DataSet[Long]
}

class DataSet[T] {
  def writeAsText(filePath: String): DataSink[T]
  def print(): DataSink[T]
  def collect(): Seq[T]
}

Data Sources and Sinks

Transformations

Core transformation operations for processing data with functional programming patterns.

class DataSet[T] {
  def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
  def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]  
  def filter(fun: T => Boolean): DataSet[T]
  def reduce(fun: (T, T) => T): DataSet[T]
  def distinct(): DataSet[T]
  def union(other: DataSet[T]*): DataSet[T]
}

Transformations

Grouping and Aggregation

Group data by keys and perform aggregation operations with type-safe field access.

class DataSet[T] {
  def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
  def groupBy(fields: Int*): GroupedDataSet[T]
  def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
}

class GroupedDataSet[T] {
  def reduce(fun: (T, T) => T): DataSet[T]
  def sum(field: Int): DataSet[T]
  def max(field: Int): DataSet[T] 
  def min(field: Int): DataSet[T]
  def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
}

Grouping and Aggregation

Joins and CoGroups

Combine multiple DataSets using joins, co-groups, and cross products with flexible key selection.

class DataSet[T] {
  def join[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
  def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
  def cross[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
}

class UnfinishedJoinOperation[T, O] {
  def where[K: TypeInformation](keySelector: T => K): UnfinishedJoinOperationWhere[T, O]
  def where(fields: Int*): UnfinishedJoinOperationWhere[T, O]
}

Joins and CoGroups

Iterations

Support for iterative algorithms with both bulk iteration and delta iteration patterns.

class DataSet[T] {
  def iterate(maxIterations: Int)(stepFunction: DataSet[T] => DataSet[T]): DataSet[T]
  def iterateWithTermination(maxIterations: Int)(stepFunction: DataSet[T] => (DataSet[T], DataSet[_])): DataSet[T]
  def iterateDelta[R: TypeInformation: ClassTag](
    workset: DataSet[R], 
    maxIterations: Int, 
    keyFields: Array[String]
  )(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
}

Iterations

Type System and Serialization

Comprehensive type information system supporting Scala types with macro-based code generation.

// Implicit type information generation
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]

// Core type information classes
class CaseClassTypeInfo[T] extends TypeInformation[T]
class OptionTypeInfo[T] extends TypeInformation[Option[T]]
class EitherTypeInfo[A, B] extends TypeInformation[Either[A, B]]
class TryTypeInfo[T] extends TypeInformation[Try[T]]

Type System

Hadoop Integration

Native integration with Hadoop MapReduce and MapRed input/output formats.

class ExecutionEnvironment {
  def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
    inputFormat: MapreduceInputFormat[K, V],
    keyClass: Class[K],
    valueClass: Class[V], 
    inputPath: String
  ): DataSet[(K, V)]
}

Hadoop Integration

Types

// Core execution types
class ExecutionEnvironment
class DataSet[T]
class GroupedDataSet[T] 
class CrossDataSet[T, O]
class AggregateDataSet[T]
class CoGroupDataSet[T, O]

// Configuration and results
class ExecutionConfig
class JobExecutionResult
class JobID

// Resource management
class ResourceSpec

// Type information
abstract class TypeInformation[T]
class CaseClassTypeInfo[T] extends TypeInformation[T]
class OptionTypeInfo[T] extends TypeInformation[Option[T]]
class EitherTypeInfo[A, B] extends TypeInformation[Either[A, B]]

// Serialization
abstract class TypeSerializer[T]
class CaseClassSerializer[T] extends TypeSerializer[T]

// Operators and functions  
trait MapFunction[T, O]
trait FlatMapFunction[T, O]
trait FilterFunction[T]
trait ReduceFunction[T]
trait GroupReduceFunction[T, O]
trait JoinFunction[T, O, R]
trait CoGroupFunction[T, O, R]
trait CrossFunction[T, O, R]

// Aggregation types
class Aggregations
class Order

// Join operations
class UnfinishedJoinOperation[T, O]
class UnfinishedJoinOperationWhere[T, O]  
class UnfinishedJoinOperationWhereEqual[T, O]
class JoinDataSet[T, O]

// Output types
class DataSink[T]
trait OutputFormat[T]
class TextOutputFormat[T] extends OutputFormat[T]

// Input types  
trait InputFormat[T, S]
class TextInputFormat extends InputFormat[String, FileInputSplit]

// Utilities
object DataSetUtils
class ScalaGauge[T]

// Accumulator types
trait Accumulator[V, R] 
class IntCounter extends Accumulator[Int, Int]
class LongCounter extends Accumulator[Long, Long]
class DoubleCounter extends Accumulator[Double, Double]
class Histogram extends Accumulator[Int, java.util.Map[Int, Int]]
class ListAccumulator[T] extends Accumulator[T, java.util.ArrayList[T]]
class IntMaximum extends Accumulator[Int, Int]
class IntMinimum extends Accumulator[Int, Int]
class DoubleMaximum extends Accumulator[Double, Double]
class DoubleMinimum extends Accumulator[Double, Double]

// Rich functions
trait RichFunction
abstract class RichMapFunction[T, O] extends MapFunction[T, O] with RichFunction
abstract class RichFlatMapFunction[T, O] extends FlatMapFunction[T, O] with RichFunction
abstract class RichFilterFunction[T] extends FilterFunction[T] with RichFunction
abstract class RichReduceFunction[T] extends ReduceFunction[T] with RichFunction
abstract class RichGroupReduceFunction[T, O] extends GroupReduceFunction[T, O] with RichFunction
abstract class RichMapPartitionFunction[T, O] extends MapPartitionFunction[T, O] with RichFunction

// Runtime context
trait RuntimeContext
trait BroadcastVariableInitializer[T, C]

// Configuration
class Configuration