Apache Flink Streaming Scala API provides elegant and fluent Scala APIs for building high-throughput, low-latency stream processing applications with fault-tolerance and exactly-once processing guarantees
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Flink Streaming Scala API provides elegant and fluent Scala APIs for building high-throughput, low-latency stream processing applications with fault-tolerance and exactly-once processing guarantees. This library wraps Flink's Java DataStream API with Scala-idiomatic interfaces, offering type-safe streaming data processing with functional programming constructs.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.14.6</version>
</dependency>For SBT:
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala_2.11" % "1.14.6"import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.scala._
// Create execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create a data stream from a collection
val dataStream = env.fromCollection(List(1, 2, 3, 4, 5))
// Transform the data
val result = dataStream
.map(_ * 2)
.filter(_ > 5)
.keyBy(identity)
.sum(0)
// Add sink and execute
result.print()
env.execute("Basic Flink Job")The Flink Streaming Scala API is built around several key components:
Main entry point for creating streaming applications and configuring execution parameters like parallelism, checkpointing, and state backends.
class StreamExecutionEnvironment(javaEnv: JavaEnv) {
def getExecutionEnvironment: StreamExecutionEnvironment
def setParallelism(parallelism: Int): Unit
def enableCheckpointing(interval: Long): StreamExecutionEnvironment
}Core streaming operations for transforming, filtering, and processing unbounded data streams with type safety and functional programming patterns.
class DataStream[T] {
def map[R](mapper: T => R): DataStream[R]
def filter(predicate: T => Boolean): DataStream[T]
def keyBy[K](keySelector: T => K): KeyedStream[T, K]
def union(otherStreams: DataStream[T]*): DataStream[T]
}Partitioned stream operations enabling stateful computations, aggregations, and key-based processing with state management.
class KeyedStream[T, K] {
def sum(field: Int): DataStream[T]
def reduce(reducer: (T, T) => T): DataStream[T]
def window[W <: Window](assigner: WindowAssigner[T, W]): WindowedStream[T, K, W]
def process[R](function: KeyedProcessFunction[K, T, R]): DataStream[R]
}Time-based and count-based grouping of stream elements for aggregations and computations over bounded sets of data.
class WindowedStream[T, K, W <: Window] {
def reduce(reducer: (T, T) => T): DataStream[T]
def aggregate[ACC, R](aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R]
def apply[R](function: WindowFunction[T, R, K, W]): DataStream[R]
}Operations for combining multiple data streams based on keys, time windows, or custom join conditions.
class JoinedStreams[T1, T2] {
def where[KEY](keySelector: T1 => KEY): Where[T1, T2, KEY]
def equalTo[KEY](keySelector: T2 => KEY): EqualTo[T1, T2, KEY]
def window[W <: Window](assigner: WindowAssigner[TaggedUnion[T1, T2], W]): WithWindow[T1, T2, KEY, W]
}Asynchronous I/O operations for non-blocking external system interactions with configurable timeouts and capacity management.
object AsyncDataStream {
def unorderedWait[IN, OUT](
stream: DataStream[IN],
function: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit
): DataStream[OUT]
}Interfaces for creating custom processing functions including window functions, process functions, and rich functions with lifecycle management.
trait ProcessFunction[I, O] {
def processElement(value: I, ctx: ProcessFunction.Context, out: Collector[O]): Unit
}
trait WindowFunction[IN, OUT, KEY, W <: Window] {
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}// Core stream types
class DataStream[T]
class KeyedStream[T, K]
class WindowedStream[T, K, W <: Window]
class AllWindowedStream[T, W <: Window]
class ConnectedStreams[T1, T2]
class BroadcastConnectedStream[IN1, IN2]
// Environment and configuration
class StreamExecutionEnvironment
class ExecutionConfig
class CheckpointConfig
// Output and utility types
class OutputTag[T]
trait CloseableIterator[T]