0
# Data Sources and Streams
1
2
DataStream represents the core abstraction for processing streams of data in Flink. It provides a rich set of transformation operations while maintaining type safety through Scala's type system.
3
4
## Capabilities
5
6
### Stream Properties and Configuration
7
8
Access stream metadata and configure stream behavior.
9
10
```scala { .api }
11
class DataStream[T] {
12
/**
13
* Get the type information for stream elements
14
* @return TypeInformation for type T
15
*/
16
def dataType: TypeInformation[T]
17
18
/**
19
* Get the execution environment associated with this stream
20
* @return StreamExecutionEnvironment instance
21
*/
22
def executionEnvironment: StreamExecutionEnvironment
23
24
/**
25
* Get the current parallelism for this stream
26
* @return Current parallelism degree
27
*/
28
def parallelism: Int
29
30
/**
31
* Set the parallelism for this operation
32
* @param parallelism Parallelism degree
33
* @return New DataStream with specified parallelism
34
*/
35
def setParallelism(parallelism: Int): DataStream[T]
36
37
/**
38
* Set the maximum parallelism for this operation
39
* @param maxParallelism Maximum parallelism degree
40
* @return New DataStream with specified max parallelism
41
*/
42
def setMaxParallelism(maxParallelism: Int): DataStream[T]
43
44
/**
45
* Set a name for this operation
46
* @param name Operator name
47
* @return New DataStream with specified name
48
*/
49
def name(name: String): DataStream[T]
50
51
/**
52
* Set a unique identifier for this operation
53
* @param uid Unique identifier
54
* @return New DataStream with specified UID
55
*/
56
def uid(uid: String): DataStream[T]
57
}
58
```
59
60
### Basic Transformations
61
62
Core transformation operations for modifying stream elements.
63
64
```scala { .api }
65
class DataStream[T] {
66
/**
67
* Apply a function to each element in the stream
68
* @param fun Mapping function from T to R
69
* @return DataStream of mapped elements
70
*/
71
def map[R: TypeInformation](fun: T => R): DataStream[R]
72
73
/**
74
* Apply a MapFunction to each element
75
* @param mapper MapFunction implementation
76
* @return DataStream of mapped elements
77
*/
78
def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R]
79
80
/**
81
* Apply a function that returns multiple elements for each input
82
* @param fun Function returning TraversableOnce of R
83
* @return DataStream of flattened results
84
*/
85
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
86
87
/**
88
* Apply a FlatMapFunction that outputs to a Collector
89
* @param fun Function that outputs to Collector
90
* @return DataStream of collected results
91
*/
92
def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R]
93
94
/**
95
* Filter elements based on a predicate
96
* @param fun Predicate function returning Boolean
97
* @return DataStream of filtered elements
98
*/
99
def filter(fun: T => Boolean): DataStream[T]
100
101
/**
102
* Filter elements using a FilterFunction
103
* @param filter FilterFunction implementation
104
* @return DataStream of filtered elements
105
*/
106
def filter(filter: FilterFunction[T]): DataStream[T]
107
}
108
```
109
110
**Usage Examples:**
111
112
```scala
113
import org.apache.flink.streaming.api.scala._
114
115
val env = StreamExecutionEnvironment.getExecutionEnvironment
116
val numbers = env.fromElements(1, 2, 3, 4, 5)
117
118
// Map transformation
119
val doubled = numbers.map(_ * 2)
120
121
// FlatMap transformation
122
val words = env.fromElements("hello world", "scala flink")
123
.flatMap(_.split(" "))
124
125
// Filter transformation
126
val evenNumbers = numbers.filter(_ % 2 == 0)
127
128
// Chaining transformations
129
val result = numbers
130
.filter(_ > 2)
131
.map(_ * 3)
132
.filter(_ < 15)
133
```
134
135
### Stream Partitioning
136
137
Control how stream elements are distributed across parallel instances.
138
139
```scala { .api }
140
class DataStream[T] {
141
/**
142
* Partition by key using a key selector function
143
* @param fun Key selector function
144
* @return KeyedStream partitioned by the key
145
*/
146
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
147
148
/**
149
* Partition by key using a KeySelector
150
* @param fun KeySelector implementation
151
* @return KeyedStream partitioned by the key
152
*/
153
def keyBy[K: TypeInformation](fun: KeySelector[T, K]): KeyedStream[T, K]
154
155
/**
156
* Custom partitioning using a Partitioner
157
* @param partitioner Custom partitioner implementation
158
* @param fun Key selector for partitioning
159
* @return DataStream with custom partitioning
160
*/
161
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataStream[T]
162
163
/**
164
* Broadcast all elements to all downstream operators
165
* @return DataStream with broadcast partitioning
166
*/
167
def broadcast: DataStream[T]
168
169
/**
170
* Round-robin distribution across parallel instances
171
* @return DataStream with rebalanced partitioning
172
*/
173
def rebalance: DataStream[T]
174
175
/**
176
* Local round-robin within the same TaskManager
177
* @return DataStream with rescaled partitioning
178
*/
179
def rescale: DataStream[T]
180
181
/**
182
* Random distribution across parallel instances
183
* @return DataStream with shuffle partitioning
184
*/
185
def shuffle: DataStream[T]
186
187
/**
188
* Forward elements to next operator (no redistribution)
189
* @return DataStream with forward partitioning
190
*/
191
def forward: DataStream[T]
192
193
/**
194
* Send all elements to the first parallel instance
195
* @return DataStream with global partitioning
196
*/
197
def global: DataStream[T]
198
}
199
```
200
201
**Usage Examples:**
202
203
```scala
204
import org.apache.flink.streaming.api.scala._
205
206
case class User(id: Int, name: String, department: String)
207
208
val users = env.fromElements(
209
User(1, "Alice", "Engineering"),
210
User(2, "Bob", "Sales"),
211
User(3, "Charlie", "Engineering")
212
)
213
214
// Key by user department
215
val usersByDept = users.keyBy(_.department)
216
217
// Key by user ID
218
val usersById = users.keyBy(_.id)
219
220
// Rebalance for load distribution
221
val balanced = users.rebalance
222
223
// Broadcast to all downstream operators
224
val broadcast = users.broadcast
225
```
226
227
### Stream Union and Connections
228
229
Combine multiple streams into unified processing pipelines.
230
231
```scala { .api }
232
class DataStream[T] {
233
/**
234
* Union this stream with other streams of the same type
235
* @param dataStreams Other streams to union with
236
* @return DataStream containing elements from all input streams
237
*/
238
def union(dataStreams: DataStream[T]*): DataStream[T]
239
240
/**
241
* Connect this stream with another stream of different type
242
* @param dataStream Stream to connect with
243
* @return ConnectedStreams for co-processing
244
*/
245
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
246
247
/**
248
* Connect with a broadcast stream for broadcast state
249
* @param broadcastStream Broadcast stream to connect with
250
* @return BroadcastConnectedStream for broadcast processing
251
*/
252
def connect[R](broadcastStream: BroadcastStream[R]): BroadcastConnectedStream[T, R]
253
}
254
```
255
256
### Windowing (All-Window Operations)
257
258
Apply windowing operations on non-keyed streams.
259
260
```scala { .api }
261
class DataStream[T] {
262
/**
263
* Apply time-based tumbling windowing to all elements (deprecated)
264
* @param size Window size
265
* @return AllWindowedStream for aggregations
266
*/
267
@deprecated("Use windowAll(TumblingEventTimeWindows.of(size))", "1.12.0")
268
def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]
269
270
/**
271
* Apply time-based sliding windowing to all elements (deprecated)
272
* @param size Window size
273
* @param slide Slide interval
274
* @return AllWindowedStream for aggregations
275
*/
276
@deprecated("Use windowAll(SlidingEventTimeWindows.of(size, slide))", "1.12.0")
277
def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow]
278
279
/**
280
* Apply count-based windowing to all elements
281
* @param size Window size (number of elements)
282
* @return AllWindowedStream for aggregations
283
*/
284
def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]
285
286
/**
287
* Apply sliding count-based windowing to all elements
288
* @param size Window size (number of elements)
289
* @param slide Slide size (number of elements)
290
* @return AllWindowedStream for aggregations
291
*/
292
def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow]
293
294
/**
295
* Apply custom windowing to all elements
296
* @param assigner Window assigner implementation
297
* @return AllWindowedStream for aggregations
298
*/
299
def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W]
300
}
301
```
302
303
### Time and Watermarks
304
305
Configure event time processing and watermark generation.
306
307
```scala { .api }
308
class DataStream[T] {
309
/**
310
* Assign timestamps and watermarks using a WatermarkStrategy
311
* @param watermarkStrategy Strategy for timestamp and watermark assignment
312
* @return DataStream with assigned timestamps
313
*/
314
def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): DataStream[T]
315
316
/**
317
* Assign ascending timestamps (deprecated)
318
* @param extractor Function to extract timestamps
319
* @return DataStream with assigned timestamps
320
*/
321
def assignAscendingTimestamps(extractor: T => Long): DataStream[T]
322
}
323
```
324
325
**Usage Examples:**
326
327
```scala
328
import org.apache.flink.streaming.api.scala._
329
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
330
import java.time.Duration
331
332
case class Event(id: String, timestamp: Long, value: Double)
333
334
val events = env.fromElements(
335
Event("A", 1000L, 1.0),
336
Event("B", 2000L, 2.0),
337
Event("C", 3000L, 3.0)
338
)
339
340
// Assign watermarks for event time processing
341
val eventsWithWatermarks = events
342
.assignTimestampsAndWatermarks(
343
WatermarkStrategy
344
.forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
345
.withTimestampAssigner(new SerializableTimestampAssigner[Event] {
346
override def extractTimestamp(element: Event, recordTimestamp: Long): Long =
347
element.timestamp
348
})
349
)
350
```
351
352
### Iterations
353
354
Create iterative processing patterns for complex algorithms.
355
356
```scala { .api }
357
class DataStream[T] {
358
/**
359
* Create an iteration with feedback loop
360
* @param stepFunction Function defining iteration step
361
* @param maxWaitTimeMillis Maximum wait time for iteration
362
* @return DataStream with iteration results
363
*/
364
def iterate[R](
365
stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),
366
maxWaitTimeMillis: Long = 0
367
): DataStream[R]
368
369
/**
370
* Create an iteration with connected streams
371
* @param stepFunction Function with connected streams step
372
* @param maxWaitTimeMillis Maximum wait time for iteration
373
* @return DataStream with iteration results
374
*/
375
def iterate[R, F: TypeInformation](
376
stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),
377
maxWaitTimeMillis: Long
378
): DataStream[R]
379
}
380
```
381
382
### Processing Functions
383
384
Apply custom processing logic with access to runtime context.
385
386
```scala { .api }
387
class DataStream[T] {
388
/**
389
* Apply a ProcessFunction for low-level processing
390
* @param processFunction ProcessFunction implementation
391
* @return DataStream with processed results
392
*/
393
def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]
394
}
395
```
396
397
### Side Outputs
398
399
Extract additional output streams from processing functions.
400
401
```scala { .api }
402
class DataStream[T] {
403
/**
404
* Get a side output stream by tag
405
* @param tag OutputTag identifying the side output
406
* @return DataStream of side output elements
407
*/
408
def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X]
409
}
410
```
411
412
### Advanced Operations
413
414
Low-level operations for custom stream processing.
415
416
```scala { .api }
417
class DataStream[T] {
418
/**
419
* Apply a custom stream operator
420
* @param operatorName Name for the operator
421
* @param operator Custom operator implementation
422
* @return DataStream with custom transformation
423
*/
424
def transform[R: TypeInformation](
425
operatorName: String,
426
operator: OneInputStreamOperator[T, R]
427
): DataStream[R]
428
429
/**
430
* Cache this stream for reuse in multiple downstream operations
431
* @return CachedDataStream for reuse
432
*/
433
def cache(): CachedDataStream[T]
434
}
435
```
436
437
## Types
438
439
```scala { .api }
440
// Core function interfaces
441
trait MapFunction[T, R] {
442
def map(value: T): R
443
}
444
445
trait FlatMapFunction[T, R] {
446
def flatMap(value: T, out: Collector[R]): Unit
447
}
448
449
trait FilterFunction[T] {
450
def filter(value: T): Boolean
451
}
452
453
// Key selector interface
454
trait KeySelector[T, K] {
455
def getKey(value: T): K
456
}
457
458
// Partitioner interface
459
trait Partitioner[K] {
460
def partition(key: K, numPartitions: Int): Int
461
}
462
463
// Collector interface for output
464
trait Collector[T] {
465
def collect(record: T): Unit
466
def close(): Unit
467
}
468
469
// Output tag for side outputs
470
case class OutputTag[T: TypeInformation](id: String) {
471
def getTypeInfo: TypeInformation[T]
472
}
473
474
// Cached data stream
475
class CachedDataStream[T](dataStream: DataStream[T]) extends DataStream[T] {
476
def invalidateCache(): Unit
477
}
478
```