Apache Flink Streaming Scala API provides elegant and fluent Scala APIs for building high-throughput, low-latency stream processing applications with fault-tolerance and exactly-once processing guarantees
npx @tessl/cli install tessl/maven-org-apache-flink--flink-streaming-scala_2-11@1.14.0Apache Flink Streaming Scala API provides elegant and fluent Scala APIs for building high-throughput, low-latency stream processing applications with fault-tolerance and exactly-once processing guarantees. This library wraps Flink's Java DataStream API with Scala-idiomatic interfaces, offering type-safe streaming data processing with functional programming constructs.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.14.6</version>
</dependency>For SBT:
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala_2.11" % "1.14.6"import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala._
// Create execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create a data stream from a collection
val dataStream = env.fromCollection(List(1, 2, 3, 4, 5))
// Transform the data
val result = dataStream
.map(_ * 2)
.filter(_ > 5)
.keyBy(identity)
.sum(0)
// Add sink and execute
result.print()
env.execute("Basic Flink Job")The Flink Streaming Scala API is built around several key components:
Main entry point for creating streaming applications and configuring execution parameters like parallelism, checkpointing, and state backends.
class StreamExecutionEnvironment(javaEnv: JavaEnv) {
def getExecutionEnvironment: StreamExecutionEnvironment
def setParallelism(parallelism: Int): Unit
def enableCheckpointing(interval: Long): StreamExecutionEnvironment
}Core streaming operations for transforming, filtering, and processing unbounded data streams with type safety and functional programming patterns.
class DataStream[T] {
def map[R](mapper: T => R): DataStream[R]
def filter(predicate: T => Boolean): DataStream[T]
def keyBy[K](keySelector: T => K): KeyedStream[T, K]
def union(otherStreams: DataStream[T]*): DataStream[T]
}Partitioned stream operations enabling stateful computations, aggregations, and key-based processing with state management.
class KeyedStream[T, K] {
def sum(field: Int): DataStream[T]
def reduce(reducer: (T, T) => T): DataStream[T]
def window[W <: Window](assigner: WindowAssigner[T, W]): WindowedStream[T, K, W]
def process[R](function: KeyedProcessFunction[K, T, R]): DataStream[R]
}Time-based and count-based grouping of stream elements for aggregations and computations over bounded sets of data.
class WindowedStream[T, K, W <: Window] {
def reduce(reducer: (T, T) => T): DataStream[T]
def aggregate[ACC, R](aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R]
def apply[R](function: WindowFunction[T, R, K, W]): DataStream[R]
}Operations for combining multiple data streams based on keys, time windows, or custom join conditions.
class JoinedStreams[T1, T2] {
def where[KEY](keySelector: T1 => KEY): Where[T1, T2, KEY]
def equalTo[KEY](keySelector: T2 => KEY): EqualTo[T1, T2, KEY]
def window[W <: Window](assigner: WindowAssigner[TaggedUnion[T1, T2], W]): WithWindow[T1, T2, KEY, W]
}Asynchronous I/O operations for non-blocking external system interactions with configurable timeouts and capacity management.
object AsyncDataStream {
def unorderedWait[IN, OUT](
stream: DataStream[IN],
function: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit
): DataStream[OUT]
}Interfaces for creating custom processing functions including window functions, process functions, and rich functions with lifecycle management.
trait ProcessFunction[I, O] {
def processElement(value: I, ctx: ProcessFunction.Context, out: Collector[O]): Unit
}
trait WindowFunction[IN, OUT, KEY, W <: Window] {
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}// Core stream types
class DataStream[T]
class KeyedStream[T, K]
class WindowedStream[T, K, W <: Window]
class AllWindowedStream[T, W <: Window]
class ConnectedStreams[T1, T2]
class BroadcastConnectedStream[IN1, IN2]
// Environment and configuration
class StreamExecutionEnvironment
class ExecutionConfig
class CheckpointConfig
// Output and utility types
class OutputTag[T]
trait CloseableIterator[T]