0
# Data Stream
1
2
DataStream represents a stream of elements of the same type. It provides the core stream processing operations for transforming, filtering, and routing data through your streaming application with type safety.
3
4
## Capabilities
5
6
### Basic Transformations
7
8
Core transformation operations that modify stream elements.
9
10
```scala { .api }
11
/**
12
* Basic stream transformations
13
*/
14
class DataStream[T] {
15
def map[R](mapper: T => R): DataStream[R]
16
def flatMap[R](flatMapper: T => TraversableOnce[R]): DataStream[R]
17
def filter(predicate: T => Boolean): DataStream[T]
18
def project(fieldIndexes: Int*): DataStream[Product]
19
}
20
```
21
22
**Usage Examples:**
23
24
```scala
25
import org.apache.flink.streaming.api.scala._
26
27
val dataStream = env.fromElements(1, 2, 3, 4, 5)
28
29
// Transform elements
30
val doubled = dataStream.map(_ * 2)
31
32
// Filter elements
33
val evenNumbers = dataStream.filter(_ % 2 == 0)
34
35
// Flat map for one-to-many transformations
36
val words = env.fromElements("hello world", "flink scala")
37
val splitWords = words.flatMap(_.split(" "))
38
```
39
40
### Keying Operations
41
42
Operations for partitioning streams by key for stateful processing.
43
44
```scala { .api }
45
/**
46
* Stream keying operations
47
*/
48
class DataStream[T] {
49
def keyBy[K](keySelector: T => K): KeyedStream[T, K]
50
def keyBy(fields: String*): KeyedStream[T, Tuple]
51
def keyBy(fields: Int*): KeyedStream[T, Tuple]
52
}
53
```
54
55
**Usage Examples:**
56
57
```scala
58
case class User(id: Int, name: String, age: Int)
59
60
val users = env.fromElements(
61
User(1, "Alice", 25),
62
User(2, "Bob", 30),
63
User(1, "Alice", 26)
64
)
65
66
// Key by field
67
val keyedByUserId = users.keyBy(_.id)
68
69
// Key by field name (for case classes)
70
val keyedByName = users.keyBy("name")
71
72
// Key by field index
73
val keyedByAge = users.keyBy(2)
74
```
75
76
### Stream Unions and Connections
77
78
Operations for combining multiple streams.
79
80
```scala { .api }
81
/**
82
* Stream combination operations
83
*/
84
class DataStream[T] {
85
def union(otherStreams: DataStream[T]*): DataStream[T]
86
def connect[T2](otherStream: DataStream[T2]): ConnectedStreams[T, T2]
87
def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
88
def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
89
}
90
```
91
92
**Usage Examples:**
93
94
```scala
95
val stream1 = env.fromElements(1, 2, 3)
96
val stream2 = env.fromElements(4, 5, 6)
97
val stream3 = env.fromElements(7, 8, 9)
98
99
// Union streams of same type
100
val unionStream = stream1.union(stream2, stream3)
101
102
// Connect streams of different types
103
val connectedStream = stream1.connect(env.fromElements("a", "b", "c"))
104
105
// Join two streams
106
val joinedStream = stream1.join(stream2)
107
```
108
109
### Side Outputs
110
111
Split streams into multiple output streams using OutputTags.
112
113
```scala { .api }
114
/**
115
* Side output operations
116
*/
117
class DataStream[T] {
118
def getSideOutput[X](sideOutputTag: OutputTag[X]): DataStream[X]
119
def split(splitter: T => TraversableOnce[String]): SplitStream[T]
120
}
121
```
122
123
**Usage Examples:**
124
125
```scala
126
val evenTag = OutputTag[Int]("even-numbers")
127
val oddTag = OutputTag[Int]("odd-numbers")
128
129
val numbers = env.fromElements(1, 2, 3, 4, 5, 6)
130
131
val processedStream = numbers.process(new ProcessFunction[Int, String] {
132
override def processElement(value: Int, ctx: ProcessFunction.Context, out: Collector[String]): Unit = {
133
if (value % 2 == 0) {
134
ctx.output(evenTag, value)
135
} else {
136
ctx.output(oddTag, value)
137
}
138
out.collect(s"processed: $value")
139
}
140
})
141
142
val evenNumbers = processedStream.getSideOutput(evenTag)
143
val oddNumbers = processedStream.getSideOutput(oddTag)
144
```
145
146
### Sinks and Output
147
148
Operations for sending stream data to external systems.
149
150
```scala { .api }
151
/**
152
* Output and sink operations
153
*/
154
class DataStream[T] {
155
def print(): DataStreamSink[T]
156
def print(sinkIdentifier: String): DataStreamSink[T]
157
def printToErr(): DataStreamSink[T]
158
def printToErr(sinkIdentifier: String): DataStreamSink[T]
159
def writeAsText(path: String): DataStreamSink[T]
160
def writeAsText(path: String, writeMode: WriteMode): DataStreamSink[T]
161
def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]
162
def sinkTo(sink: Sink[T]): DataStreamSink[T]
163
}
164
```
165
166
**Usage Examples:**
167
168
```scala
169
val dataStream = env.fromElements("hello", "world", "flink")
170
171
// Print to console
172
dataStream.print()
173
174
// Print with identifier
175
dataStream.print("my-output")
176
177
// Write to file
178
dataStream.writeAsText("output/result.txt")
179
180
// Custom sink
181
dataStream.addSink(new MyCustomSinkFunction())
182
```
183
184
### Stream Configuration
185
186
Methods for configuring stream properties.
187
188
```scala { .api }
189
/**
190
* Stream configuration operations
191
*/
192
class DataStream[T] {
193
def setParallelism(parallelism: Int): DataStream[T]
194
def getParallelism: Int
195
def setMaxParallelism(maxParallelism: Int): DataStream[T]
196
def getMaxParallelism: Int
197
def name(name: String): DataStream[T]
198
def uid(uid: String): DataStream[T]
199
def setUidHash(uidHash: String): DataStream[T]
200
def disableChaining(): DataStream[T]
201
def startNewChain(): DataStream[T]
202
def slotSharingGroup(slotSharingGroup: String): DataStream[T]
203
}
204
```
205
206
### Iteration Operations
207
208
Operations for creating iterative streams (loops).
209
210
```scala { .api }
211
/**
212
* Iteration operations
213
*/
214
class DataStream[T] {
215
def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R])): DataStream[R]
216
def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), maxWaitTimeMillis: Long): DataStream[R]
217
}
218
```
219
220
### Time and Watermark Operations
221
222
Configure event time processing and watermark generation.
223
224
```scala { .api }
225
/**
226
* Time and watermark operations
227
*/
228
class DataStream[T] {
229
def assignAscendingTimestamps(timestampExtractor: T => Long): DataStream[T]
230
def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): DataStream[T]
231
}
232
```
233
234
**Usage Examples:**
235
236
```scala
237
case class Event(timestamp: Long, data: String)
238
239
val events = env.fromElements(
240
Event(1000L, "first"),
241
Event(2000L, "second"),
242
Event(3000L, "third")
243
)
244
245
// Assign ascending timestamps
246
val timestampedEvents = events.assignAscendingTimestamps(_.timestamp)
247
248
// Custom watermark strategy
249
val watermarkedEvents = events.assignTimestampsAndWatermarks(
250
WatermarkStrategy
251
.forBoundedOutOfOrderness(Duration.ofSeconds(5))
252
.withTimestampAssigner(new SerializableTimestampAssigner[Event] {
253
override def extractTimestamp(element: Event, recordTimestamp: Long): Long = element.timestamp
254
})
255
)
256
```
257
258
### Process Functions
259
260
Apply custom processing logic using ProcessFunction.
261
262
```scala { .api }
263
/**
264
* Process function operations
265
*/
266
class DataStream[T] {
267
def process[R](processFunction: ProcessFunction[T, R]): DataStream[R]
268
}
269
```
270
271
## Types
272
273
```scala { .api }
274
// Main stream class
275
class DataStream[T]
276
277
// Related stream types
278
class KeyedStream[T, K]
279
class ConnectedStreams[T1, T2]
280
class JoinedStreams[T1, T2]
281
class CoGroupedStreams[T1, T2]
282
class SplitStream[T]
283
284
// Sink types
285
class DataStreamSink[T]
286
287
// Output and utility types
288
class OutputTag[T](id: String)
289
290
// Time-related types
291
trait WatermarkStrategy[T]
292
trait SerializableTimestampAssigner[T]
293
294
// Function types
295
trait ProcessFunction[I, O]
296
trait SinkFunction[T]
297
trait Sink[T]
298
```