0
# Apache Flink Scala API for DataStream Processing
1
2
Apache Flink Scala API provides a type-safe, functional programming interface for building streaming data processing applications. It wraps Flink's Java DataStream API with Scala-friendly constructs, enabling developers to use functional programming patterns, type safety, and expressive syntax for real-time stream processing with exactly-once processing guarantees.
3
4
## Package Information
5
6
- **Package Name**: flink-streaming-scala_2.10
7
- **Package Type**: Maven
8
- **Language**: Scala 2.10
9
- **Installation**: `org.apache.flink:flink-streaming-scala_2.10:1.3.3`
10
11
## Core Imports
12
13
```scala
14
import org.apache.flink.streaming.api.scala._
15
```
16
17
For specific functionality:
18
19
```scala
20
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, KeyedStream}
21
import org.apache.flink.streaming.api.scala.function.{WindowFunction, ProcessWindowFunction}
22
import org.apache.flink.streaming.api.windowing.time.Time
23
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
24
```
25
26
## Basic Usage
27
28
```scala
29
import org.apache.flink.streaming.api.scala._
30
import org.apache.flink.streaming.api.windowing.time.Time
31
32
// Set up the execution environment
33
val env = StreamExecutionEnvironment.getExecutionEnvironment
34
35
// Create a data stream from a collection
36
val stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
37
38
// Transform the data
39
val result = stream
40
.filter(_ % 2 == 0) // Filter even numbers
41
.map(_ * 2) // Double each number
42
.keyBy(identity) // Key by value
43
.timeWindow(Time.seconds(5)) // 5-second tumbling windows
44
.sum(0) // Sum values in each window
45
46
// Add a sink to print results
47
result.print()
48
49
// Execute the streaming program
50
env.execute("Basic Flink Scala Example")
51
```
52
53
## Architecture
54
55
Flink Scala API is built around several key architectural components:
56
57
- **StreamExecutionEnvironment**: Entry point for creating and configuring streaming applications
58
- **DataStream**: Core abstraction representing unbounded streams of elements with type safety
59
- **KeyedStream**: Partitioned streams enabling stateful operations and keyed transformations
60
- **WindowedStream**: Time or count-based partitioned streams for aggregations over bounded intervals
61
- **Function Interfaces**: Type-safe interfaces for user-defined operations (transformations, aggregations, windows)
62
- **Type System Integration**: Automatic TypeInformation generation via Scala macros for serialization
63
- **State Management**: Managed state with exactly-once consistency guarantees and fault tolerance
64
- **Time Processing**: Support for event time, processing time, and ingestion time with watermark handling
65
66
## Capabilities
67
68
### Stream Environment and Execution
69
70
Core functionality for creating and configuring Flink streaming applications, including environment setup, parallelism control, checkpointing, and program execution.
71
72
```scala { .api }
73
object StreamExecutionEnvironment {
74
def getExecutionEnvironment: StreamExecutionEnvironment
75
def createLocalEnvironment(): StreamExecutionEnvironment
76
def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment
77
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
78
}
79
80
class StreamExecutionEnvironment {
81
def setParallelism(parallelism: Int): Unit
82
def getParallelism: Int
83
def enableCheckpointing(interval: Long): Unit
84
def execute(): JobExecutionResult
85
def execute(jobName: String): JobExecutionResult
86
}
87
```
88
89
[Stream Environment and Execution](./stream-environment.md)
90
91
### Data Sources and Stream Creation
92
93
Comprehensive functionality for creating DataStreams from various sources including collections, files, sockets, and custom source functions.
94
95
```scala { .api }
96
class StreamExecutionEnvironment {
97
def fromElements[T: TypeInformation](data: T*): DataStream[T]
98
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
99
def readTextFile(filePath: String): DataStream[String]
100
def socketTextStream(hostname: String, port: Int): DataStream[String]
101
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]
102
def generateSequence(from: Long, to: Long): DataStream[Long]
103
}
104
```
105
106
[Data Sources and Stream Creation](./data-sources.md)
107
108
### Stream Transformations and Operations
109
110
Core stream processing operations including element-wise transformations, filtering, and stateful processing with type-safe functional interfaces.
111
112
```scala { .api }
113
class DataStream[T] {
114
def map[R: TypeInformation](fun: T => R): DataStream[R]
115
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
116
def filter(fun: T => Boolean): DataStream[T]
117
def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]
118
}
119
```
120
121
[Stream Transformations and Operations](./stream-transformations.md)
122
123
### Stream Partitioning and Distribution
124
125
Functionality for controlling data distribution across parallel operators including key-based partitioning, broadcasting, and custom partitioning strategies.
126
127
```scala { .api }
128
class DataStream[T] {
129
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
130
def keyBy(fields: Int*): KeyedStream[T, _]
131
def keyBy(firstField: String, otherFields: String*): KeyedStream[T, _]
132
def broadcast: DataStream[T]
133
def shuffle: DataStream[T]
134
def rebalance: DataStream[T]
135
}
136
```
137
138
[Stream Partitioning and Distribution](./stream-partitioning.md)
139
140
### Keyed Streams and Stateful Processing
141
142
Advanced operations on partitioned streams including stateful transformations, aggregations, and state management with exactly-once consistency.
143
144
```scala { .api }
145
class KeyedStream[T, K] {
146
def reduce(fun: (T, T) => T): DataStream[T]
147
def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]
148
def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R]
149
def sum(position: Int): DataStream[T]
150
def min(position: Int): DataStream[T]
151
def max(position: Int): DataStream[T]
152
}
153
```
154
155
[Keyed Streams and Stateful Processing](./keyed-streams.md)
156
157
### Windowing and Time-Based Processing
158
159
Comprehensive windowing functionality for both keyed and non-keyed streams, including time-based and count-based windows with custom triggers and evictors.
160
161
```scala { .api }
162
class KeyedStream[T, K] {
163
def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]
164
def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]
165
def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]
166
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
167
}
168
169
class DataStream[T] {
170
def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]
171
def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]
172
}
173
```
174
175
[Windowing and Time-Based Processing](./windowing.md)
176
177
### Window Operations and Aggregations
178
179
Operations that can be applied to windowed streams including built-in aggregations, custom window functions, and incremental aggregations.
180
181
```scala { .api }
182
class WindowedStream[T, K, W <: Window] {
183
def reduce(function: (T, T) => T): DataStream[T]
184
def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
185
def process[R: TypeInformation](function: ProcessWindowFunction[T, R, K, W]): DataStream[R]
186
def sum(position: Int): DataStream[T]
187
def min(position: Int): DataStream[T]
188
def max(position: Int): DataStream[T]
189
}
190
```
191
192
[Window Operations and Aggregations](./window-operations.md)
193
194
### Stream Composition and Joining
195
196
Advanced stream composition operations including union, connect, join, and co-group operations for processing multiple streams together.
197
198
```scala { .api }
199
class DataStream[T] {
200
def union(dataStreams: DataStream[T]*): DataStream[T]
201
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
202
def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
203
def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
204
}
205
206
class ConnectedStreams[IN1, IN2] {
207
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]
208
def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]
209
}
210
```
211
212
[Stream Composition and Joining](./stream-composition.md)
213
214
### Asynchronous I/O Operations
215
216
High-performance asynchronous I/O operations for external system integration with configurable parallelism, timeouts, and result ordering.
217
218
```scala { .api }
219
object AsyncDataStream {
220
def unorderedWait[IN, OUT: TypeInformation](
221
input: DataStream[IN],
222
asyncFunction: AsyncFunction[IN, OUT],
223
timeout: Long,
224
timeUnit: TimeUnit
225
): DataStream[OUT]
226
227
def orderedWait[IN, OUT: TypeInformation](
228
input: DataStream[IN],
229
asyncFunction: AsyncFunction[IN, OUT],
230
timeout: Long,
231
timeUnit: TimeUnit
232
): DataStream[OUT]
233
}
234
235
trait AsyncFunction[IN, OUT] {
236
def asyncInvoke(input: IN, collector: AsyncCollector[OUT]): Unit
237
}
238
```
239
240
[Asynchronous I/O Operations](./async-operations.md)
241
242
### Output Operations and Sinks
243
244
Comprehensive output functionality for writing stream results to various destinations including files, databases, message queues, and custom sinks.
245
246
```scala { .api }
247
class DataStream[T] {
248
def print(): DataStreamSink[T]
249
def writeAsText(path: String): DataStreamSink[T]
250
def writeAsCsv(path: String): DataStreamSink[T]
251
def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]
252
def addSink(fun: T => Unit): DataStreamSink[T]
253
}
254
```
255
256
[Output Operations and Sinks](./output-operations.md)
257
258
### Function Interfaces and User-Defined Functions
259
260
Type-safe interfaces for implementing custom processing logic including window functions, process functions, and rich functions with lifecycle management.
261
262
```scala { .api }
263
trait WindowFunction[IN, OUT, KEY, W <: Window] {
264
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
265
}
266
267
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
268
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
269
def clear(context: Context): Unit
270
}
271
272
trait AsyncFunction[IN, OUT] {
273
def asyncInvoke(input: IN, collector: AsyncCollector[OUT]): Unit
274
}
275
```
276
277
[Function Interfaces and User-Defined Functions](./function-interfaces.md)
278
279
### Scala Extensions and Partial Functions
280
281
Scala-specific extensions that enable partial function support for more idiomatic Scala programming with automatic conversion between partial and total functions.
282
283
```scala { .api }
284
import org.apache.flink.streaming.api.scala.extensions._
285
286
class OnDataStream[T] {
287
def mapWith[R: TypeInformation](fun: PartialFunction[T, R]): DataStream[R]
288
def filterWith(fun: PartialFunction[T, Boolean]): DataStream[T]
289
def flatMapWith[R: TypeInformation](fun: PartialFunction[T, TraversableOnce[R]]): DataStream[R]
290
}
291
```
292
293
[Scala Extensions and Partial Functions](./scala-extensions.md)
294
295
## Types
296
297
### Core Stream Types
298
299
```scala { .api }
300
class DataStream[T]
301
class KeyedStream[T, K] extends DataStream[T]
302
class ConnectedStreams[IN1, IN2]
303
class WindowedStream[T, K, W <: Window]
304
class AllWindowedStream[T, W <: Window]
305
class SplitStream[T] extends DataStream[T]
306
```
307
308
### Environment and Configuration Types
309
310
```scala { .api }
311
class StreamExecutionEnvironment
312
class ExecutionConfig
313
class CheckpointConfig
314
class RestartStrategies
315
```
316
317
### Function Interface Types
318
319
```scala { .api }
320
trait WindowFunction[IN, OUT, KEY, W <: Window]
321
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
322
abstract class RichWindowFunction[IN, OUT, KEY, W <: Window]
323
trait AsyncFunction[IN, OUT]
324
trait AsyncCollector[OUT]
325
```
326
327
### Builder Types
328
329
```scala { .api }
330
class JoinedStreams[T1, T2]
331
class CoGroupedStreams[T1, T2]
332
class OutputTag[T]
333
```