Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-streaming-scala-2-12@1.20.0Apache Flink Streaming Scala provides idiomatic Scala bindings for Apache Flink's streaming data processing capabilities. It wraps Flink's Java streaming API with elegant Scala constructs, enabling developers to create high-throughput, low-latency stream processing applications using familiar Scala syntax and type safety.
⚠️ Deprecation Notice: All Flink Scala APIs are deprecated since version 1.18.0 and will be removed in a future Flink major version. Users should migrate to the Java DataStream API or Table API while still being able to write applications in Scala. See FLIP-265 for details.
Maven:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.20.2</version>
</dependency>SBT:
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.20.2"import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function._For specific functionality:
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, KeyedStream}
import org.apache.flink.streaming.api.scala.async.AsyncDataStream
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.ProcessFunctionimport org.apache.flink.streaming.api.scala._
// Create execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create data source
val numbers = env.fromElements(1, 2, 3, 4, 5)
// Transform data
val doubled = numbers
.map(_ * 2)
.filter(_ > 5)
// Output results
doubled.print()
// Execute the job
env.execute("Simple Flink Job")Flink Streaming Scala is built around several key components:
StreamExecutionEnvironment serves as the entry point for creating streaming applicationsDataStream[T] represents bounded or unbounded streams of data with type safetyKeyedStream[T, K] enables partitioned processing and stateful operationsCore environment setup and job execution functionality. The entry point for all Flink streaming applications.
object StreamExecutionEnvironment {
def getExecutionEnvironment: StreamExecutionEnvironment
def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
}
class StreamExecutionEnvironment {
def setParallelism(parallelism: Int): Unit
def enableCheckpointing(interval: Long): StreamExecutionEnvironment
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
}Core data stream operations including creation, transformation, and partitioning. The foundation for all stream processing operations.
class StreamExecutionEnvironment {
def fromElements[T: TypeInformation](data: T*): DataStream[T]
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
def readTextFile(filePath: String): DataStream[String]
def socketTextStream(hostname: String, port: Int): DataStream[String]
}
class DataStream[T] {
def map[R: TypeInformation](fun: T => R): DataStream[R]
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
def filter(fun: T => Boolean): DataStream[T]
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
}Partitioned stream processing with state management and aggregations. Essential for stateful computations and windowed operations.
class KeyedStream[T, K] {
def reduce(fun: (T, T) => T): DataStream[T]
def sum(position: Int): DataStream[T]
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
def process[R: TypeInformation](processFunction: KeyedProcessFunction[K, T, R]): DataStream[R]
def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R]
}Time and count-based windowing for bounded computations on infinite streams. Supports various window types and aggregation functions.
class WindowedStream[T, K, W <: Window] {
def reduce(function: (T, T) => T): DataStream[T]
def aggregate[ACC: TypeInformation, R: TypeInformation](aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R]
def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
def process[R: TypeInformation](function: ProcessWindowFunction[T, R, K, W]): DataStream[R]
def allowedLateness(lateness: Time): WindowedStream[T, K, W]
}Multi-stream operations including unions, connects, joins, and co-processing. Enables complex multi-input stream processing patterns.
class DataStream[T] {
def union(dataStreams: DataStream[T]*): DataStream[T]
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
}
class ConnectedStreams[IN1, IN2] {
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]
def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]
}High-performance async operations for external service calls without blocking stream processing. Essential for enriching streams with external data.
object AsyncDataStream {
def unorderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
asyncFunction: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit
): DataStream[OUT]
def orderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
asyncFunction: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit
): DataStream[OUT]
}
trait AsyncFunction[IN, OUT] {
def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
}Low-level processing functions for complex event-driven logic with access to timers, state, and side outputs.
abstract class ProcessFunction[I, O] {
def processElement(value: I, ctx: Context, out: Collector[O]): Unit
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit
}
abstract class KeyedProcessFunction[K, I, O] {
def processElement(value: I, ctx: Context, out: Collector[O]): Unit
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit
}Specialized functions for processing windowed data with access to window metadata and state.
trait WindowFunction[IN, OUT, KEY, W <: Window] {
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
def clear(context: Context): Unit
}Output operations for writing processed data to external systems and monitoring stream results.
class DataStream[T] {
def print(): DataStreamSink[T]
def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]
def sinkTo(sink: Sink[T]): DataStreamSink[T]
def executeAndCollect(): CloseableIterator[T]
}
trait SinkFunction[T] {
def invoke(value: T, context: Context): Unit
}// Core execution types
trait JobExecutionResult {
def getJobExecutionTime: Long
def getAccumulatorResult[V](accumulatorName: String): V
}
// Time and watermark types
class Time(val duration: Long, val unit: TimeUnit)
object Time {
def milliseconds(milliseconds: Long): Time
def seconds(seconds: Long): Time
def minutes(minutes: Long): Time
def hours(hours: Long): Time
def days(days: Long): Time
}
// Type information for Scala types
trait TypeInformation[T] {
def getTypeClass: Class[T]
def isBasicType: Boolean
def isTupleType: Boolean
}
// Output tag for side outputs
case class OutputTag[T: TypeInformation](id: String)
// Iterator for collecting results
trait CloseableIterator[T] extends Iterator[T] with AutoCloseable