Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.
—
DataStream represents the core abstraction for processing streams of data in Flink. It provides a rich set of transformation operations while maintaining type safety through Scala's type system.
Access stream metadata and configure stream behavior.
class DataStream[T] {
/**
* Get the type information for stream elements
* @return TypeInformation for type T
*/
def dataType: TypeInformation[T]
/**
* Get the execution environment associated with this stream
* @return StreamExecutionEnvironment instance
*/
def executionEnvironment: StreamExecutionEnvironment
/**
* Get the current parallelism for this stream
* @return Current parallelism degree
*/
def parallelism: Int
/**
* Set the parallelism for this operation
* @param parallelism Parallelism degree
* @return New DataStream with specified parallelism
*/
def setParallelism(parallelism: Int): DataStream[T]
/**
* Set the maximum parallelism for this operation
* @param maxParallelism Maximum parallelism degree
* @return New DataStream with specified max parallelism
*/
def setMaxParallelism(maxParallelism: Int): DataStream[T]
/**
* Set a name for this operation
* @param name Operator name
* @return New DataStream with specified name
*/
def name(name: String): DataStream[T]
/**
* Set a unique identifier for this operation
* @param uid Unique identifier
* @return New DataStream with specified UID
*/
def uid(uid: String): DataStream[T]
}Core transformation operations for modifying stream elements.
class DataStream[T] {
/**
* Apply a function to each element in the stream
* @param fun Mapping function from T to R
* @return DataStream of mapped elements
*/
def map[R: TypeInformation](fun: T => R): DataStream[R]
/**
* Apply a MapFunction to each element
* @param mapper MapFunction implementation
* @return DataStream of mapped elements
*/
def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R]
/**
* Apply a function that returns multiple elements for each input
* @param fun Function returning TraversableOnce of R
* @return DataStream of flattened results
*/
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
/**
* Apply a FlatMapFunction that outputs to a Collector
* @param fun Function that outputs to Collector
* @return DataStream of collected results
*/
def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R]
/**
* Filter elements based on a predicate
* @param fun Predicate function returning Boolean
* @return DataStream of filtered elements
*/
def filter(fun: T => Boolean): DataStream[T]
/**
* Filter elements using a FilterFunction
* @param filter FilterFunction implementation
* @return DataStream of filtered elements
*/
def filter(filter: FilterFunction[T]): DataStream[T]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val numbers = env.fromElements(1, 2, 3, 4, 5)
// Map transformation
val doubled = numbers.map(_ * 2)
// FlatMap transformation
val words = env.fromElements("hello world", "scala flink")
.flatMap(_.split(" "))
// Filter transformation
val evenNumbers = numbers.filter(_ % 2 == 0)
// Chaining transformations
val result = numbers
.filter(_ > 2)
.map(_ * 3)
.filter(_ < 15)Control how stream elements are distributed across parallel instances.
class DataStream[T] {
/**
* Partition by key using a key selector function
* @param fun Key selector function
* @return KeyedStream partitioned by the key
*/
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
/**
* Partition by key using a KeySelector
* @param fun KeySelector implementation
* @return KeyedStream partitioned by the key
*/
def keyBy[K: TypeInformation](fun: KeySelector[T, K]): KeyedStream[T, K]
/**
* Custom partitioning using a Partitioner
* @param partitioner Custom partitioner implementation
* @param fun Key selector for partitioning
* @return DataStream with custom partitioning
*/
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataStream[T]
/**
* Broadcast all elements to all downstream operators
* @return DataStream with broadcast partitioning
*/
def broadcast: DataStream[T]
/**
* Round-robin distribution across parallel instances
* @return DataStream with rebalanced partitioning
*/
def rebalance: DataStream[T]
/**
* Local round-robin within the same TaskManager
* @return DataStream with rescaled partitioning
*/
def rescale: DataStream[T]
/**
* Random distribution across parallel instances
* @return DataStream with shuffle partitioning
*/
def shuffle: DataStream[T]
/**
* Forward elements to next operator (no redistribution)
* @return DataStream with forward partitioning
*/
def forward: DataStream[T]
/**
* Send all elements to the first parallel instance
* @return DataStream with global partitioning
*/
def global: DataStream[T]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
case class User(id: Int, name: String, department: String)
val users = env.fromElements(
User(1, "Alice", "Engineering"),
User(2, "Bob", "Sales"),
User(3, "Charlie", "Engineering")
)
// Key by user department
val usersByDept = users.keyBy(_.department)
// Key by user ID
val usersById = users.keyBy(_.id)
// Rebalance for load distribution
val balanced = users.rebalance
// Broadcast to all downstream operators
val broadcast = users.broadcastCombine multiple streams into unified processing pipelines.
class DataStream[T] {
/**
* Union this stream with other streams of the same type
* @param dataStreams Other streams to union with
* @return DataStream containing elements from all input streams
*/
def union(dataStreams: DataStream[T]*): DataStream[T]
/**
* Connect this stream with another stream of different type
* @param dataStream Stream to connect with
* @return ConnectedStreams for co-processing
*/
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
/**
* Connect with a broadcast stream for broadcast state
* @param broadcastStream Broadcast stream to connect with
* @return BroadcastConnectedStream for broadcast processing
*/
def connect[R](broadcastStream: BroadcastStream[R]): BroadcastConnectedStream[T, R]
}Apply windowing operations on non-keyed streams.
class DataStream[T] {
/**
* Apply time-based tumbling windowing to all elements (deprecated)
* @param size Window size
* @return AllWindowedStream for aggregations
*/
@deprecated("Use windowAll(TumblingEventTimeWindows.of(size))", "1.12.0")
def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]
/**
* Apply time-based sliding windowing to all elements (deprecated)
* @param size Window size
* @param slide Slide interval
* @return AllWindowedStream for aggregations
*/
@deprecated("Use windowAll(SlidingEventTimeWindows.of(size, slide))", "1.12.0")
def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow]
/**
* Apply count-based windowing to all elements
* @param size Window size (number of elements)
* @return AllWindowedStream for aggregations
*/
def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]
/**
* Apply sliding count-based windowing to all elements
* @param size Window size (number of elements)
* @param slide Slide size (number of elements)
* @return AllWindowedStream for aggregations
*/
def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow]
/**
* Apply custom windowing to all elements
* @param assigner Window assigner implementation
* @return AllWindowedStream for aggregations
*/
def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W]
}Configure event time processing and watermark generation.
class DataStream[T] {
/**
* Assign timestamps and watermarks using a WatermarkStrategy
* @param watermarkStrategy Strategy for timestamp and watermark assignment
* @return DataStream with assigned timestamps
*/
def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): DataStream[T]
/**
* Assign ascending timestamps (deprecated)
* @param extractor Function to extract timestamps
* @return DataStream with assigned timestamps
*/
def assignAscendingTimestamps(extractor: T => Long): DataStream[T]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import java.time.Duration
case class Event(id: String, timestamp: Long, value: Double)
val events = env.fromElements(
Event("A", 1000L, 1.0),
Event("B", 2000L, 2.0),
Event("C", 3000L, 3.0)
)
// Assign watermarks for event time processing
val eventsWithWatermarks = events
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner[Event] {
override def extractTimestamp(element: Event, recordTimestamp: Long): Long =
element.timestamp
})
)Create iterative processing patterns for complex algorithms.
class DataStream[T] {
/**
* Create an iteration with feedback loop
* @param stepFunction Function defining iteration step
* @param maxWaitTimeMillis Maximum wait time for iteration
* @return DataStream with iteration results
*/
def iterate[R](
stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),
maxWaitTimeMillis: Long = 0
): DataStream[R]
/**
* Create an iteration with connected streams
* @param stepFunction Function with connected streams step
* @param maxWaitTimeMillis Maximum wait time for iteration
* @return DataStream with iteration results
*/
def iterate[R, F: TypeInformation](
stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),
maxWaitTimeMillis: Long
): DataStream[R]
}Apply custom processing logic with access to runtime context.
class DataStream[T] {
/**
* Apply a ProcessFunction for low-level processing
* @param processFunction ProcessFunction implementation
* @return DataStream with processed results
*/
def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]
}Extract additional output streams from processing functions.
class DataStream[T] {
/**
* Get a side output stream by tag
* @param tag OutputTag identifying the side output
* @return DataStream of side output elements
*/
def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X]
}Low-level operations for custom stream processing.
class DataStream[T] {
/**
* Apply a custom stream operator
* @param operatorName Name for the operator
* @param operator Custom operator implementation
* @return DataStream with custom transformation
*/
def transform[R: TypeInformation](
operatorName: String,
operator: OneInputStreamOperator[T, R]
): DataStream[R]
/**
* Cache this stream for reuse in multiple downstream operations
* @return CachedDataStream for reuse
*/
def cache(): CachedDataStream[T]
}// Core function interfaces
trait MapFunction[T, R] {
def map(value: T): R
}
trait FlatMapFunction[T, R] {
def flatMap(value: T, out: Collector[R]): Unit
}
trait FilterFunction[T] {
def filter(value: T): Boolean
}
// Key selector interface
trait KeySelector[T, K] {
def getKey(value: T): K
}
// Partitioner interface
trait Partitioner[K] {
def partition(key: K, numPartitions: Int): Int
}
// Collector interface for output
trait Collector[T] {
def collect(record: T): Unit
def close(): Unit
}
// Output tag for side outputs
case class OutputTag[T: TypeInformation](id: String) {
def getTypeInfo: TypeInformation[T]
}
// Cached data stream
class CachedDataStream[T](dataStream: DataStream[T]) extends DataStream[T] {
def invalidateCache(): Unit
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12