Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Flink Streaming Scala provides idiomatic Scala bindings for Apache Flink's streaming data processing capabilities. It wraps Flink's Java streaming API with elegant Scala constructs, enabling developers to create high-throughput, low-latency stream processing applications using familiar Scala syntax and type safety.
⚠️ Deprecation Notice: All Flink Scala APIs are deprecated since version 1.18.0 and will be removed in a future Flink major version. Users should migrate to the Java DataStream API or Table API while still being able to write applications in Scala. See FLIP-265 for details.
Maven:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.20.2</version>
</dependency>SBT:
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.20.2"import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function._For specific functionality:
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, KeyedStream}
import org.apache.flink.streaming.api.scala.async.AsyncDataStream
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.functions.ProcessFunctionimport org.apache.flink.streaming.api.scala._
// Create execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create data source
val numbers = env.fromElements(1, 2, 3, 4, 5)
// Transform data
val doubled = numbers
.map(_ * 2)
.filter(_ > 5)
// Output results
doubled.print()
// Execute the job
env.execute("Simple Flink Job")Flink Streaming Scala is built around several key components:
StreamExecutionEnvironment serves as the entry point for creating streaming applicationsDataStream[T] represents bounded or unbounded streams of data with type safetyKeyedStream[T, K] enables partitioned processing and stateful operationsCore environment setup and job execution functionality. The entry point for all Flink streaming applications.
object StreamExecutionEnvironment {
def getExecutionEnvironment: StreamExecutionEnvironment
def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
}
class StreamExecutionEnvironment {
def setParallelism(parallelism: Int): Unit
def enableCheckpointing(interval: Long): StreamExecutionEnvironment
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
}Core data stream operations including creation, transformation, and partitioning. The foundation for all stream processing operations.
class StreamExecutionEnvironment {
def fromElements[T: TypeInformation](data: T*): DataStream[T]
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
def readTextFile(filePath: String): DataStream[String]
def socketTextStream(hostname: String, port: Int): DataStream[String]
}
class DataStream[T] {
def map[R: TypeInformation](fun: T => R): DataStream[R]
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
def filter(fun: T => Boolean): DataStream[T]
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
}Partitioned stream processing with state management and aggregations. Essential for stateful computations and windowed operations.
class KeyedStream[T, K] {
def reduce(fun: (T, T) => T): DataStream[T]
def sum(position: Int): DataStream[T]
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
def process[R: TypeInformation](processFunction: KeyedProcessFunction[K, T, R]): DataStream[R]
def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R]
}Time and count-based windowing for bounded computations on infinite streams. Supports various window types and aggregation functions.
class WindowedStream[T, K, W <: Window] {
def reduce(function: (T, T) => T): DataStream[T]
def aggregate[ACC: TypeInformation, R: TypeInformation](aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R]
def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
def process[R: TypeInformation](function: ProcessWindowFunction[T, R, K, W]): DataStream[R]
def allowedLateness(lateness: Time): WindowedStream[T, K, W]
}Multi-stream operations including unions, connects, joins, and co-processing. Enables complex multi-input stream processing patterns.
class DataStream[T] {
def union(dataStreams: DataStream[T]*): DataStream[T]
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
}
class ConnectedStreams[IN1, IN2] {
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]
def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]
}High-performance async operations for external service calls without blocking stream processing. Essential for enriching streams with external data.
object AsyncDataStream {
def unorderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
asyncFunction: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit
): DataStream[OUT]
def orderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
asyncFunction: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit
): DataStream[OUT]
}
trait AsyncFunction[IN, OUT] {
def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
}Low-level processing functions for complex event-driven logic with access to timers, state, and side outputs.
abstract class ProcessFunction[I, O] {
def processElement(value: I, ctx: Context, out: Collector[O]): Unit
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit
}
abstract class KeyedProcessFunction[K, I, O] {
def processElement(value: I, ctx: Context, out: Collector[O]): Unit
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit
}Specialized functions for processing windowed data with access to window metadata and state.
trait WindowFunction[IN, OUT, KEY, W <: Window] {
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
def clear(context: Context): Unit
}Output operations for writing processed data to external systems and monitoring stream results.
class DataStream[T] {
def print(): DataStreamSink[T]
def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]
def sinkTo(sink: Sink[T]): DataStreamSink[T]
def executeAndCollect(): CloseableIterator[T]
}
trait SinkFunction[T] {
def invoke(value: T, context: Context): Unit
}// Core execution types
trait JobExecutionResult {
def getJobExecutionTime: Long
def getAccumulatorResult[V](accumulatorName: String): V
}
// Time and watermark types
class Time(val duration: Long, val unit: TimeUnit)
object Time {
def milliseconds(milliseconds: Long): Time
def seconds(seconds: Long): Time
def minutes(minutes: Long): Time
def hours(hours: Long): Time
def days(days: Long): Time
}
// Type information for Scala types
trait TypeInformation[T] {
def getTypeClass: Class[T]
def isBasicType: Boolean
def isTupleType: Boolean
}
// Output tag for side outputs
case class OutputTag[T: TypeInformation](id: String)
// Iterator for collecting results
trait CloseableIterator[T] extends Iterator[T] with AutoCloseable