Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-streaming-scala-2-12@1.20.00
# Flink Streaming Scala
1
2
Apache Flink Streaming Scala provides idiomatic Scala bindings for Apache Flink's streaming data processing capabilities. It wraps Flink's Java streaming API with elegant Scala constructs, enabling developers to create high-throughput, low-latency stream processing applications using familiar Scala syntax and type safety.
3
4
**⚠️ Deprecation Notice**: All Flink Scala APIs are deprecated since version 1.18.0 and will be removed in a future Flink major version. Users should migrate to the Java DataStream API or Table API while still being able to write applications in Scala. See [FLIP-265](https://s.apache.org/flip-265) for details.
5
6
## Package Information
7
8
- **Package Name**: flink-streaming-scala_2.12
9
- **Package Type**: maven
10
- **Language**: Scala
11
- **Group ID**: org.apache.flink
12
- **Artifact ID**: flink-streaming-scala_2.12
13
- **Version**: 1.20.2
14
- **Installation**: Add to your Maven/SBT dependencies
15
16
Maven:
17
```xml
18
<dependency>
19
<groupId>org.apache.flink</groupId>
20
<artifactId>flink-streaming-scala_2.12</artifactId>
21
<version>1.20.2</version>
22
</dependency>
23
```
24
25
SBT:
26
```scala
27
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.20.2"
28
```
29
30
## Core Imports
31
32
```scala
33
import org.apache.flink.streaming.api.scala._
34
import org.apache.flink.streaming.api.scala.function._
35
```
36
37
For specific functionality:
38
```scala
39
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, KeyedStream}
40
import org.apache.flink.streaming.api.scala.async.AsyncDataStream
41
import org.apache.flink.streaming.api.windowing.time.Time
42
import org.apache.flink.streaming.api.functions.ProcessFunction
43
```
44
45
## Basic Usage
46
47
```scala
48
import org.apache.flink.streaming.api.scala._
49
50
// Create execution environment
51
val env = StreamExecutionEnvironment.getExecutionEnvironment
52
53
// Create data source
54
val numbers = env.fromElements(1, 2, 3, 4, 5)
55
56
// Transform data
57
val doubled = numbers
58
.map(_ * 2)
59
.filter(_ > 5)
60
61
// Output results
62
doubled.print()
63
64
// Execute the job
65
env.execute("Simple Flink Job")
66
```
67
68
## Architecture
69
70
Flink Streaming Scala is built around several key components:
71
72
- **Execution Environment**: `StreamExecutionEnvironment` serves as the entry point for creating streaming applications
73
- **Data Streams**: `DataStream[T]` represents bounded or unbounded streams of data with type safety
74
- **Keyed Streams**: `KeyedStream[T, K]` enables partitioned processing and stateful operations
75
- **Windowing**: Time and count-based windows for bounded computations on infinite streams
76
- **Connectors**: Sources and sinks for various external systems
77
- **Processing Functions**: Low-level processing functions for complex event-driven logic
78
- **Async I/O**: High-performance async operations for external service calls
79
80
## Capabilities
81
82
### Execution Environment
83
84
Core environment setup and job execution functionality. The entry point for all Flink streaming applications.
85
86
```scala { .api }
87
object StreamExecutionEnvironment {
88
def getExecutionEnvironment: StreamExecutionEnvironment
89
def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment
90
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
91
}
92
93
class StreamExecutionEnvironment {
94
def setParallelism(parallelism: Int): Unit
95
def enableCheckpointing(interval: Long): StreamExecutionEnvironment
96
def execute(): JobExecutionResult
97
def execute(jobName: String): JobExecutionResult
98
}
99
```
100
101
[Execution Environment](./execution-environment.md)
102
103
### Data Sources and Streams
104
105
Core data stream operations including creation, transformation, and partitioning. The foundation for all stream processing operations.
106
107
```scala { .api }
108
class StreamExecutionEnvironment {
109
def fromElements[T: TypeInformation](data: T*): DataStream[T]
110
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
111
def readTextFile(filePath: String): DataStream[String]
112
def socketTextStream(hostname: String, port: Int): DataStream[String]
113
}
114
115
class DataStream[T] {
116
def map[R: TypeInformation](fun: T => R): DataStream[R]
117
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
118
def filter(fun: T => Boolean): DataStream[T]
119
def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
120
}
121
```
122
123
[Data Sources and Streams](./data-streams.md)
124
125
### Keyed Streams and Stateful Processing
126
127
Partitioned stream processing with state management and aggregations. Essential for stateful computations and windowed operations.
128
129
```scala { .api }
130
class KeyedStream[T, K] {
131
def reduce(fun: (T, T) => T): DataStream[T]
132
def sum(position: Int): DataStream[T]
133
def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
134
def process[R: TypeInformation](processFunction: KeyedProcessFunction[K, T, R]): DataStream[R]
135
def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R]
136
}
137
```
138
139
[Keyed Streams and State](./keyed-streams.md)
140
141
### Windowing Operations
142
143
Time and count-based windowing for bounded computations on infinite streams. Supports various window types and aggregation functions.
144
145
```scala { .api }
146
class WindowedStream[T, K, W <: Window] {
147
def reduce(function: (T, T) => T): DataStream[T]
148
def aggregate[ACC: TypeInformation, R: TypeInformation](aggregateFunction: AggregateFunction[T, ACC, R]): DataStream[R]
149
def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
150
def process[R: TypeInformation](function: ProcessWindowFunction[T, R, K, W]): DataStream[R]
151
def allowedLateness(lateness: Time): WindowedStream[T, K, W]
152
}
153
```
154
155
[Windowing Operations](./windowing.md)
156
157
### Stream Connections and Joins
158
159
Multi-stream operations including unions, connects, joins, and co-processing. Enables complex multi-input stream processing patterns.
160
161
```scala { .api }
162
class DataStream[T] {
163
def union(dataStreams: DataStream[T]*): DataStream[T]
164
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
165
def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
166
def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
167
}
168
169
class ConnectedStreams[IN1, IN2] {
170
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]
171
def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]
172
}
173
```
174
175
[Stream Connections and Joins](./stream-connections.md)
176
177
### Async I/O Operations
178
179
High-performance async operations for external service calls without blocking stream processing. Essential for enriching streams with external data.
180
181
```scala { .api }
182
object AsyncDataStream {
183
def unorderedWait[IN, OUT: TypeInformation](
184
input: DataStream[IN],
185
asyncFunction: AsyncFunction[IN, OUT],
186
timeout: Long,
187
timeUnit: TimeUnit
188
): DataStream[OUT]
189
190
def orderedWait[IN, OUT: TypeInformation](
191
input: DataStream[IN],
192
asyncFunction: AsyncFunction[IN, OUT],
193
timeout: Long,
194
timeUnit: TimeUnit
195
): DataStream[OUT]
196
}
197
198
trait AsyncFunction[IN, OUT] {
199
def asyncInvoke(input: IN, resultFuture: ResultFuture[OUT]): Unit
200
}
201
```
202
203
[Async I/O Operations](./async-io.md)
204
205
### Processing Functions
206
207
Low-level processing functions for complex event-driven logic with access to timers, state, and side outputs.
208
209
```scala { .api }
210
abstract class ProcessFunction[I, O] {
211
def processElement(value: I, ctx: Context, out: Collector[O]): Unit
212
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit
213
}
214
215
abstract class KeyedProcessFunction[K, I, O] {
216
def processElement(value: I, ctx: Context, out: Collector[O]): Unit
217
def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[O]): Unit
218
}
219
```
220
221
[Processing Functions](./processing-functions.md)
222
223
### Window Functions
224
225
Specialized functions for processing windowed data with access to window metadata and state.
226
227
```scala { .api }
228
trait WindowFunction[IN, OUT, KEY, W <: Window] {
229
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
230
}
231
232
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
233
def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
234
def clear(context: Context): Unit
235
}
236
```
237
238
[Window Functions](./window-functions.md)
239
240
### Sinks and Output
241
242
Output operations for writing processed data to external systems and monitoring stream results.
243
244
```scala { .api }
245
class DataStream[T] {
246
def print(): DataStreamSink[T]
247
def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]
248
def sinkTo(sink: Sink[T]): DataStreamSink[T]
249
def executeAndCollect(): CloseableIterator[T]
250
}
251
252
trait SinkFunction[T] {
253
def invoke(value: T, context: Context): Unit
254
}
255
```
256
257
[Sinks and Output](./sinks-output.md)
258
259
## Types
260
261
```scala { .api }
262
// Core execution types
263
trait JobExecutionResult {
264
def getJobExecutionTime: Long
265
def getAccumulatorResult[V](accumulatorName: String): V
266
}
267
268
// Time and watermark types
269
class Time(val duration: Long, val unit: TimeUnit)
270
object Time {
271
def milliseconds(milliseconds: Long): Time
272
def seconds(seconds: Long): Time
273
def minutes(minutes: Long): Time
274
def hours(hours: Long): Time
275
def days(days: Long): Time
276
}
277
278
// Type information for Scala types
279
trait TypeInformation[T] {
280
def getTypeClass: Class[T]
281
def isBasicType: Boolean
282
def isTupleType: Boolean
283
}
284
285
// Output tag for side outputs
286
case class OutputTag[T: TypeInformation](id: String)
287
288
// Iterator for collecting results
289
trait CloseableIterator[T] extends Iterator[T] with AutoCloseable
290
```