Apache Flink Scala API for DataStream processing with type-safe, functional programming constructs for building streaming data processing applications.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-streaming-scala_2-10@1.3.0Apache Flink Scala API provides a type-safe, functional programming interface for building streaming data processing applications. It wraps Flink's Java DataStream API with Scala-friendly constructs, enabling developers to use functional programming patterns, type safety, and expressive syntax for real-time stream processing with exactly-once processing guarantees.
org.apache.flink:flink-streaming-scala_2.10:1.3.3import org.apache.flink.streaming.api.scala._For specific functionality:
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, KeyedStream}
import org.apache.flink.streaming.api.scala.function.{WindowFunction, ProcessWindowFunction}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindowimport org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
// Set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create a data stream from a collection
val stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Transform the data
val result = stream
.filter(_ % 2 == 0) // Filter even numbers
.map(_ * 2) // Double each number
.keyBy(identity) // Key by value
.timeWindow(Time.seconds(5)) // 5-second tumbling windows
.sum(0) // Sum values in each window
// Add a sink to print results
result.print()
// Execute the streaming program
env.execute("Basic Flink Scala Example")Flink Scala API is built around several key architectural components:
Core functionality for creating and configuring Flink streaming applications, including environment setup, parallelism control, checkpointing, and program execution.
object StreamExecutionEnvironment {
def getExecutionEnvironment: StreamExecutionEnvironment
def createLocalEnvironment(): StreamExecutionEnvironment
def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
}
class StreamExecutionEnvironment {
def setParallelism(parallelism: Int): Unit
def getParallelism: Int
def enableCheckpointing(interval: Long): Unit
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
}Stream Environment and Execution
Comprehensive functionality for creating DataStreams from various sources including collections, files, sockets, and custom source functions.
class StreamExecutionEnvironment {
def fromElements[T: TypeInformation](data: T*): DataStream[T]
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
def readTextFile(filePath: String): DataStream[String]
def socketTextStream(hostname: String, port: Int): DataStream[String]
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]
def generateSequence(from: Long, to: Long): DataStream[Long]
}Data Sources and Stream Creation
Core stream processing operations including element-wise transformations, filtering, and stateful processing with type-safe functional interfaces.
class DataStream[T] {
def map[R: TypeInformation](fun: T => R): DataStream[R]
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
def filter(fun: T => Boolean): DataStream[T]
def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]
}Stream Transformations and Operations
Functionality for controlling data distribution across parallel operators including key-based partitioning, broadcasting, and custom partitioning strategies.
class DataStream[T] {
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
def keyBy(fields: Int*): KeyedStream[T, _]
def keyBy(firstField: String, otherFields: String*): KeyedStream[T, _]
def broadcast: DataStream[T]
def shuffle: DataStream[T]
def rebalance: DataStream[T]
}Stream Partitioning and Distribution
Advanced operations on partitioned streams including stateful transformations, aggregations, and state management with exactly-once consistency.
class KeyedStream[T, K] {
def reduce(fun: (T, T) => T): DataStream[T]
def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]
def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R]
def sum(position: Int): DataStream[T]
def min(position: Int): DataStream[T]
def max(position: Int): DataStream[T]
}Keyed Streams and Stateful Processing
Comprehensive windowing functionality for both keyed and non-keyed streams, including time-based and count-based windows with custom triggers and evictors.
class KeyedStream[T, K] {
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]
def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]
def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
}
class DataStream[T] {
def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]
def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]
}Windowing and Time-Based Processing
Operations that can be applied to windowed streams including built-in aggregations, custom window functions, and incremental aggregations.
class WindowedStream[T, K, W <: Window] {
def reduce(function: (T, T) => T): DataStream[T]
def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
def process[R: TypeInformation](function: ProcessWindowFunction[T, R, K, W]): DataStream[R]
def sum(position: Int): DataStream[T]
def min(position: Int): DataStream[T]
def max(position: Int): DataStream[T]
}Window Operations and Aggregations
Advanced stream composition operations including union, connect, join, and co-group operations for processing multiple streams together.
class DataStream[T] {
def union(dataStreams: DataStream[T]*): DataStream[T]
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
}
class ConnectedStreams[IN1, IN2] {
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]
def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]
}Stream Composition and Joining
High-performance asynchronous I/O operations for external system integration with configurable parallelism, timeouts, and result ordering.
object AsyncDataStream {
def unorderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
asyncFunction: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit
): DataStream[OUT]
def orderedWait[IN, OUT: TypeInformation](
input: DataStream[IN],
asyncFunction: AsyncFunction[IN, OUT],
timeout: Long,
timeUnit: TimeUnit
): DataStream[OUT]
}
trait AsyncFunction[IN, OUT] {
def asyncInvoke(input: IN, collector: AsyncCollector[OUT]): Unit
}Comprehensive output functionality for writing stream results to various destinations including files, databases, message queues, and custom sinks.
class DataStream[T] {
def print(): DataStreamSink[T]
def writeAsText(path: String): DataStreamSink[T]
def writeAsCsv(path: String): DataStreamSink[T]
def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]
def addSink(fun: T => Unit): DataStreamSink[T]
}Type-safe interfaces for implementing custom processing logic including window functions, process functions, and rich functions with lifecycle management.
trait WindowFunction[IN, OUT, KEY, W <: Window] {
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
def clear(context: Context): Unit
}
trait AsyncFunction[IN, OUT] {
def asyncInvoke(input: IN, collector: AsyncCollector[OUT]): Unit
}Function Interfaces and User-Defined Functions
Scala-specific extensions that enable partial function support for more idiomatic Scala programming with automatic conversion between partial and total functions.
import org.apache.flink.streaming.api.scala.extensions._
class OnDataStream[T] {
def mapWith[R: TypeInformation](fun: PartialFunction[T, R]): DataStream[R]
def filterWith(fun: PartialFunction[T, Boolean]): DataStream[T]
def flatMapWith[R: TypeInformation](fun: PartialFunction[T, TraversableOnce[R]]): DataStream[R]
}Scala Extensions and Partial Functions
class DataStream[T]
class KeyedStream[T, K] extends DataStream[T]
class ConnectedStreams[IN1, IN2]
class WindowedStream[T, K, W <: Window]
class AllWindowedStream[T, W <: Window]
class SplitStream[T] extends DataStream[T]class StreamExecutionEnvironment
class ExecutionConfig
class CheckpointConfig
class RestartStrategiestrait WindowFunction[IN, OUT, KEY, W <: Window]
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
abstract class RichWindowFunction[IN, OUT, KEY, W <: Window]
trait AsyncFunction[IN, OUT]
trait AsyncCollector[OUT]class JoinedStreams[T1, T2]
class CoGroupedStreams[T1, T2]
class OutputTag[T]