or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-io.mddata-streams.mdexecution-environment.mdindex.mdkeyed-streams.mdprocessing-functions.mdsinks-output.mdstream-connections.mdwindow-functions.mdwindowing.md
tile.json

tessl/maven-org-apache-flink--flink-streaming-scala-2-12

Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-streaming-scala_2.12@1.20.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-streaming-scala-2-12@1.20.0

index.mddocs/

Flink Streaming Scala

Apache Flink Streaming Scala provides idiomatic Scala bindings for Apache Flink's streaming data processing capabilities. It wraps Flink's Java streaming API with elegant Scala constructs, enabling developers to create high-throughput, low-latency stream processing applications using familiar Scala syntax and type safety.

⚠️ Deprecation Notice: All Flink Scala APIs are deprecated since version 1.18.0 and will be removed in a future Flink major version. Users should migrate to the Java DataStream API or Table API while still being able to write applications in Scala. See FLIP-265 for details.

Package Information

  • Package Name: flink-streaming-scala_2.12
  • Package Type: maven
  • Language: Scala
  • Group ID: org.apache.flink
  • Artifact ID: flink-streaming-scala_2.12
  • Version: 1.20.2
  • Installation: Add to your Maven/SBT dependencies

Maven:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.20.2</version>
</dependency>

SBT:

libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.20.2"

Core Imports

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function._

For specific functionality:

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, KeyedStream}
import org.apache.flink.streaming.api.scala.async.AsyncDataStream
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.ProcessFunction

Basic Usage

import org.apache.flink.streaming.api.scala._

// Create execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// Create data source
val numbers = env.fromElements(1, 2, 3, 4, 5)

// Transform data
val doubled = numbers
  .map(_ * 2)
  .filter(_ > 5)

// Output results
doubled.print()

// Execute the job
env.execute("Simple Flink Job")

Architecture

Flink Streaming Scala is built around several key components:

  • Execution Environment: StreamExecutionEnvironment serves as the entry point for creating streaming applications
  • Data Streams: DataStream[T] represents bounded or unbounded streams of data with type safety
  • Keyed Streams: KeyedStream[T, K] enables partitioned processing and stateful operations
  • Windowing: Time and count-based windows for bounded computations on infinite streams
  • Connectors: Sources and sinks for various external systems
  • Processing Functions: Low-level processing functions for complex event-driven logic
  • Async I/O: High-performance async operations for external service calls

Capabilities

Execution Environment

Core environment setup and job execution functionality. The entry point for all Flink streaming applications.

object StreamExecutionEnvironment {
  def getExecutionEnvironment: StreamExecutionEnvironment
  def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment
  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
}

class StreamExecutionEnvironment {
  def setParallelism(parallelism: Int): Unit
  def enableCheckpointing(interval: Long): StreamExecutionEnvironment
  def execute(): JobExecutionResult
  def execute(jobName: String): JobExecutionResult
}

Execution Environment

Data Sources and Streams

Core data stream operations including creation, transformation, and partitioning. The foundation for all stream processing operations.

class StreamExecutionEnvironment {
  def fromElements[T: TypeInformation](data: T*): DataStream[T]
  def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
  def readTextFile(filePath: String): DataStream[String]
  def socketTextStream(hostname: String, port: Int): DataStream[String]
}

class DataStream[T] {
  def map[R: TypeInformation](fun: T => R): DataStream[R]
  def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
  def filter(fun: T => Boolean): DataStream[T]
  def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
}

Data Sources and Streams

Keyed Streams and Stateful Processing

Partitioned stream processing with state management and aggregations. Essential for stateful computations and windowed operations.

class KeyedStream[T, K] {
  def reduce(fun: (T, T) => T): DataStream[T]
  def sum(position: Int): DataStream[T]
  def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
  def process[R: TypeInformation](processFunction: KeyedProcessFunction[K, T, R]): DataStream[R]
  def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R]
}

Keyed Streams and State

Windowing Operations

Time and count-based windowing for bounded computations on infinite streams. Supports various window types and aggregation functions.

class WindowedStream[T, K, W <: Window] {
  def reduce(function: (T, T) => T): DataStream[T]
  def aggregate[ACC: TypeInformation, R: TypeInformation](aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R]
  def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
  def process[R: TypeInformation](function: ProcessWindowFunction[T, R, K, W]): DataStream[R]
  def allowedLateness(lateness: Time): WindowedStream[T, K, W]
}

Windowing Operations

Stream Connections and Joins

Multi-stream operations including unions, connects, joins, and co-processing. Enables complex multi-input stream processing patterns.

class DataStream[T] {
  def union(dataStreams: DataStream[T]*): DataStream[T]
  def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
  def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
  def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
}

class ConnectedStreams[IN1, IN2] {
  def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]
  def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]
}

Stream Connections and Joins

Async I/O Operations

High-performance async operations for external service calls without blocking stream processing. Essential for enriching streams with external data.

object AsyncDataStream {
  def unorderedWait[IN, OUT: TypeInformation](
    input: DataStream[IN], 
    asyncFunction: AsyncFunction[IN, OUT], 
    timeout: Long, 
    timeUnit: TimeUnit
  ): DataStream[OUT]
  
  def orderedWait[IN, OUT: TypeInformation](
    input: DataStream[IN], 
    asyncFunction: AsyncFunction[IN, OUT], 
    timeout: Long, 
    timeUnit: TimeUnit
  ): DataStream[OUT]
}

trait AsyncFunction[IN, OUT] {
  def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
}

Async I/O Operations

Processing Functions

Low-level processing functions for complex event-driven logic with access to timers, state, and side outputs.

abstract class ProcessFunction[I, O] {
  def processElement(value: I, ctx: Context, out: Collector[O]): Unit
  def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit
}

abstract class KeyedProcessFunction[K, I, O] {
  def processElement(value: I, ctx: Context, out: Collector[O]): Unit
  def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit
}

Processing Functions

Window Functions

Specialized functions for processing windowed data with access to window metadata and state.

trait WindowFunction[IN, OUT, KEY, W <: Window] {
  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
  def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
  def clear(context: Context): Unit
}

Window Functions

Sinks and Output

Output operations for writing processed data to external systems and monitoring stream results.

class DataStream[T] {
  def print(): DataStreamSink[T]
  def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]
  def sinkTo(sink: Sink[T]): DataStreamSink[T]
  def executeAndCollect(): CloseableIterator[T]
}

trait SinkFunction[T] {
  def invoke(value: T, context: Context): Unit
}

Sinks and Output

Types

// Core execution types
trait JobExecutionResult {
  def getJobExecutionTime: Long
  def getAccumulatorResult[V](accumulatorName: String): V
}

// Time and watermark types  
class Time(val duration: Long, val unit: TimeUnit)
object Time {
  def milliseconds(milliseconds: Long): Time
  def seconds(seconds: Long): Time
  def minutes(minutes: Long): Time
  def hours(hours: Long): Time
  def days(days: Long): Time
}

// Type information for Scala types
trait TypeInformation[T] {
  def getTypeClass: Class[T]
  def isBasicType: Boolean
  def isTupleType: Boolean
}

// Output tag for side outputs
case class OutputTag[T: TypeInformation](id: String)

// Iterator for collecting results
trait CloseableIterator[T] extends Iterator[T] with AutoCloseable