DataStream represents a stream of elements of the same type. It provides the core stream processing operations for transforming, filtering, and routing data through your streaming application with type safety.
Core transformation operations that modify stream elements.
/**
* Basic stream transformations
*/
class DataStream[T] {
def map[R](mapper: T => R): DataStream[R]
def flatMap[R](flatMapper: T => TraversableOnce[R]): DataStream[R]
def filter(predicate: T => Boolean): DataStream[T]
def project(fieldIndexes: Int*): DataStream[Product]
}Usage Examples:
import org.apache.flink.streaming.api.scala._
val dataStream = env.fromElements(1, 2, 3, 4, 5)
// Transform elements
val doubled = dataStream.map(_ * 2)
// Filter elements
val evenNumbers = dataStream.filter(_ % 2 == 0)
// Flat map for one-to-many transformations
val words = env.fromElements("hello world", "flink scala")
val splitWords = words.flatMap(_.split(" "))Operations for partitioning streams by key for stateful processing.
/**
* Stream keying operations
*/
class DataStream[T] {
def keyBy[K](keySelector: T => K): KeyedStream[T, K]
def keyBy(fields: String*): KeyedStream[T, Tuple]
def keyBy(fields: Int*): KeyedStream[T, Tuple]
}Usage Examples:
case class User(id: Int, name: String, age: Int)
val users = env.fromElements(
User(1, "Alice", 25),
User(2, "Bob", 30),
User(1, "Alice", 26)
)
// Key by field
val keyedByUserId = users.keyBy(_.id)
// Key by field name (for case classes)
val keyedByName = users.keyBy("name")
// Key by field index
val keyedByAge = users.keyBy(2)Operations for combining multiple streams.
/**
* Stream combination operations
*/
class DataStream[T] {
def union(otherStreams: DataStream[T]*): DataStream[T]
def connect[T2](otherStream: DataStream[T2]): ConnectedStreams[T, T2]
def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
}Usage Examples:
val stream1 = env.fromElements(1, 2, 3)
val stream2 = env.fromElements(4, 5, 6)
val stream3 = env.fromElements(7, 8, 9)
// Union streams of same type
val unionStream = stream1.union(stream2, stream3)
// Connect streams of different types
val connectedStream = stream1.connect(env.fromElements("a", "b", "c"))
// Join two streams
val joinedStream = stream1.join(stream2)Split streams into multiple output streams using OutputTags.
/**
* Side output operations
*/
class DataStream[T] {
def getSideOutput[X](sideOutputTag: OutputTag[X]): DataStream[X]
def split(splitter: T => TraversableOnce[String]): SplitStream[T]
}Usage Examples:
val evenTag = OutputTag[Int]("even-numbers")
val oddTag = OutputTag[Int]("odd-numbers")
val numbers = env.fromElements(1, 2, 3, 4, 5, 6)
val processedStream = numbers.process(new ProcessFunction[Int, String] {
override def processElement(value: Int, ctx: ProcessFunction.Context, out: Collector[String]): Unit = {
if (value % 2 == 0) {
ctx.output(evenTag, value)
} else {
ctx.output(oddTag, value)
}
out.collect(s"processed: $value")
}
})
val evenNumbers = processedStream.getSideOutput(evenTag)
val oddNumbers = processedStream.getSideOutput(oddTag)Operations for sending stream data to external systems.
/**
* Output and sink operations
*/
class DataStream[T] {
def print(): DataStreamSink[T]
def print(sinkIdentifier: String): DataStreamSink[T]
def printToErr(): DataStreamSink[T]
def printToErr(sinkIdentifier: String): DataStreamSink[T]
def writeAsText(path: String): DataStreamSink[T]
def writeAsText(path: String, writeMode: WriteMode): DataStreamSink[T]
def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]
def sinkTo(sink: Sink[T]): DataStreamSink[T]
}Usage Examples:
val dataStream = env.fromElements("hello", "world", "flink")
// Print to console
dataStream.print()
// Print with identifier
dataStream.print("my-output")
// Write to file
dataStream.writeAsText("output/result.txt")
// Custom sink
dataStream.addSink(new MyCustomSinkFunction())Methods for configuring stream properties.
/**
* Stream configuration operations
*/
class DataStream[T] {
def setParallelism(parallelism: Int): DataStream[T]
def getParallelism: Int
def setMaxParallelism(maxParallelism: Int): DataStream[T]
def getMaxParallelism: Int
def name(name: String): DataStream[T]
def uid(uid: String): DataStream[T]
def setUidHash(uidHash: String): DataStream[T]
def disableChaining(): DataStream[T]
def startNewChain(): DataStream[T]
def slotSharingGroup(slotSharingGroup: String): DataStream[T]
}Operations for creating iterative streams (loops).
/**
* Iteration operations
*/
class DataStream[T] {
def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R])): DataStream[R]
def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), maxWaitTimeMillis: Long): DataStream[R]
}Configure event time processing and watermark generation.
/**
* Time and watermark operations
*/
class DataStream[T] {
def assignAscendingTimestamps(timestampExtractor: T => Long): DataStream[T]
def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): DataStream[T]
}Usage Examples:
case class Event(timestamp: Long, data: String)
val events = env.fromElements(
Event(1000L, "first"),
Event(2000L, "second"),
Event(3000L, "third")
)
// Assign ascending timestamps
val timestampedEvents = events.assignAscendingTimestamps(_.timestamp)
// Custom watermark strategy
val watermarkedEvents = events.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner[Event] {
override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp
})
)Apply custom processing logic using ProcessFunction.
/**
* Process function operations
*/
class DataStream[T] {
def process[R](processFunction: ProcessFunction[T, R]): DataStream[R]
}// Main stream class
class DataStream[T]
// Related stream types
class KeyedStream[T, K]
class ConnectedStreams[T1, T2]
class JoinedStreams[T1, T2]
class CoGroupedStreams[T1, T2]
class SplitStream[T]
// Sink types
class DataStreamSink[T]
// Output and utility types
class OutputTag[T](id: String)
// Time-related types
trait WatermarkStrategy[T]
trait SerializableTimestampAssigner[T]
// Function types
trait ProcessFunction[I, O]
trait SinkFunction[T]
trait Sink[T]