Apache Flink Scala API providing type-safe operations and functional programming paradigms for distributed stream and batch processing applications.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-scala_2-12@1.20.00
# Flink Scala API
1
2
Apache Flink Scala API provides type-safe operations and functional programming paradigms for distributed stream and batch processing applications. It offers elegant Scala APIs with case class support, pattern matching, and functional composition patterns for building scalable data processing pipelines.
3
4
**⚠️ Deprecation Notice**: All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
5
6
## Package Information
7
8
- **Package Name**: flink-scala_2.12
9
- **Package Type**: maven
10
- **Language**: Scala
11
- **Installation**: Add to Maven pom.xml:
12
13
```xml
14
<dependency>
15
<groupId>org.apache.flink</groupId>
16
<artifactId>flink-scala_2.12</artifactId>
17
<version>1.20.2</version>
18
</dependency>
19
```
20
21
## Core Imports
22
23
```scala
24
import org.apache.flink.api.scala._
25
import org.apache.flink.api.scala.ExecutionEnvironment
26
```
27
28
For specific functionality:
29
30
```scala
31
import org.apache.flink.api.scala.{DataSet, GroupedDataSet}
32
import org.apache.flink.api.scala.typeutils.Types
33
import org.apache.flink.api.scala.extensions._ // For partial function support
34
```
35
36
## Basic Usage
37
38
```scala
39
import org.apache.flink.api.scala._
40
41
// Create execution environment
42
val env = ExecutionEnvironment.getExecutionEnvironment
43
44
// Create dataset from elements
45
val data: DataSet[String] = env.fromElements("Hello", "World", "from", "Flink")
46
47
// Transform data
48
val wordCounts = data
49
.flatMap(_.toLowerCase.split("\\W+"))
50
.filter(_.nonEmpty)
51
.map((_, 1))
52
.groupBy(0)
53
.sum(1)
54
55
// Output results
56
wordCounts.print()
57
58
// Execute the program
59
env.execute("Word Count Example")
60
61
// Example with partial functions (requires extensions import)
62
import org.apache.flink.api.scala.extensions._
63
64
case class Sale(region: String, product: String, amount: Double)
65
val sales = env.fromElements(
66
Sale("US", "ProductA", 100.0),
67
Sale("EU", "ProductA", 150.0)
68
)
69
70
val result = sales
71
.filterWith { case Sale(region, _, _) => region == "US" }
72
.mapWith { case Sale(region, product, amount) => (product, amount) }
73
.groupingBy(_._1)
74
.sum(1)
75
```
76
77
## Architecture
78
79
The Flink Scala API is built around these core concepts:
80
81
- **ExecutionEnvironment**: Context for creating and executing Flink programs
82
- **DataSet**: Immutable collection representing distributed data
83
- **Transformations**: Operations like map, filter, join that create new DataSets
84
- **Actions**: Operations like collect, print that trigger execution
85
- **Type System**: Automatic TypeInformation generation for Scala types
86
87
## Capabilities
88
89
### Execution Environment
90
91
The entry point for all Flink Scala programs, providing methods to create DataSets and configure execution.
92
93
```scala { .api }
94
object ExecutionEnvironment {
95
def getExecutionEnvironment: ExecutionEnvironment
96
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
97
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
98
}
99
100
class ExecutionEnvironment {
101
def setParallelism(parallelism: Int): Unit
102
def getParallelism: Int
103
def getConfig: ExecutionConfig
104
105
// Data source creation
106
def fromElements[T: ClassTag: TypeInformation](data: T*): DataSet[T]
107
def fromCollection[T: ClassTag: TypeInformation](data: Iterable[T]): DataSet[T]
108
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
109
def readCsvFile[T: ClassTag: TypeInformation](filePath: String, /* ... */): DataSet[T]
110
def generateSequence(from: Long, to: Long): DataSet[Long]
111
112
// Execution
113
def execute(): JobExecutionResult
114
def execute(jobName: String): JobExecutionResult
115
def executeAsync(): JobClient
116
}
117
```
118
119
[Execution Environment](./execution-environment.md)
120
121
### DataSet Operations
122
123
Core data transformation and processing operations on distributed datasets.
124
125
```scala { .api }
126
class DataSet[T] {
127
// Basic transformations
128
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
129
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
130
def filter(fun: T => Boolean): DataSet[T]
131
def distinct(): DataSet[T]
132
133
// Grouping
134
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
135
def groupBy(fields: Int*): GroupedDataSet[T]
136
137
// Aggregations
138
def reduce(fun: (T, T) => T): DataSet[T]
139
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
140
def sum(field: Int): AggregateDataSet[T]
141
def max(field: Int): AggregateDataSet[T]
142
def min(field: Int): AggregateDataSet[T]
143
144
// Output operations
145
def collect(): Seq[T]
146
def print(): Unit
147
def count(): Long
148
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = FileSystem.WriteMode.NO_OVERWRITE): DataSink[T]
149
}
150
```
151
152
[DataSet Operations](./dataset-operations.md)
153
154
### Join Operations
155
156
Joining datasets on keys with various join types and optimization hints.
157
158
```scala { .api }
159
class DataSet[T] {
160
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
161
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
162
def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
163
def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
164
}
165
166
class UnfinishedJoinOperation[L, R] {
167
def where[K: TypeInformation](fun: L => K): HalfUnfinishedJoinOperation[L, R]
168
def where(fields: Int*): HalfUnfinishedJoinOperation[L, R]
169
}
170
171
class JoinDataSet[L, R] {
172
def apply[O: TypeInformation: ClassTag](fun: (L, R) => O): DataSet[O]
173
}
174
```
175
176
[Join Operations](./join-operations.md)
177
178
### Grouped DataSet Operations
179
180
Operations available on grouped datasets including sorting and specialized aggregations.
181
182
```scala { .api }
183
class GroupedDataSet[T] {
184
def sortGroup(field: Int, order: Order): GroupedDataSet[T]
185
def reduce(fun: (T, T) => T): DataSet[T]
186
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
187
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
188
def sum(field: Int): AggregateDataSet[T]
189
def max(field: Int): AggregateDataSet[T]
190
def min(field: Int): AggregateDataSet[T]
191
def maxBy(fields: Int*): DataSet[T]
192
def minBy(fields: Int*): DataSet[T]
193
def first(n: Int): DataSet[T]
194
}
195
```
196
197
[Grouped DataSet Operations](./grouped-dataset-operations.md)
198
199
### Type System
200
201
Scala-specific type information system for serialization and type safety.
202
203
```scala { .api }
204
object Types {
205
// Basic types
206
val STRING: TypeInformation[String]
207
val INT: TypeInformation[Int]
208
val LONG: TypeInformation[Long]
209
val DOUBLE: TypeInformation[Double]
210
val BOOLEAN: TypeInformation[Boolean]
211
212
// Factory methods
213
def of[T: TypeInformation]: TypeInformation[T]
214
def TUPLE[T: TypeInformation]: TypeInformation[T]
215
def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
216
def OPTION[A, T <: Option[A]](valueType: TypeInformation[A]): TypeInformation[T]
217
def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
218
}
219
220
// Implicit type information generation (macro-based)
221
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
222
```
223
224
[Type System](./type-system.md)
225
226
### Partitioned and Sorted DataSets
227
228
Specialized DataSet types for partitioned and sorted data operations.
229
230
```scala { .api }
231
class PartitionSortedDataSet[T] extends DataSet[T] {
232
def sortPartition(field: Int, order: Order): DataSet[T]
233
def sortPartition(field: String, order: Order): DataSet[T]
234
// Note: Cannot chain key selector functions
235
}
236
```
237
238
### Extension Methods for Partial Functions
239
240
Scala-friendly extension methods that accept partial functions for pattern matching.
241
242
```scala { .api }
243
// Import extensions
244
import org.apache.flink.api.scala.extensions._
245
246
class OnDataSet[T] {
247
def mapWith[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
248
def mapPartitionWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
249
def flatMapWith[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
250
def filterWith(fun: T => Boolean): DataSet[T]
251
def reduceWith(fun: (T, T) => T): DataSet[T]
252
def reduceGroupWith[R: TypeInformation: ClassTag](fun: Stream[T] => R): DataSet[R]
253
def groupingBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
254
}
255
```
256
257
### Utility Functions
258
259
Additional utilities for DataSet operations including sampling, partitioning, and indexing.
260
261
```scala { .api }
262
// Import utilities
263
import org.apache.flink.api.scala.utils._
264
265
class DataSet[T] {
266
// Available via implicit conversions from utils package
267
def countElementsPerPartition(): DataSet[(Int, Long)]
268
def zipWithIndex(): DataSet[(Long, T)]
269
def zipWithUniqueId(): DataSet[(Long, T)]
270
def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.RNG.nextLong()): DataSet[T]
271
def sampleWithSize(withReplacement: Boolean, numSamples: Int, seed: Long = Utils.RNG.nextLong()): DataSet[T]
272
def checksumHashCode(): ChecksumHashCode
273
}
274
```
275
276
[Utility Functions](./utility-functions.md)
277
278
## Common Types
279
280
```scala { .api }
281
// Execution configuration
282
class ExecutionConfig {
283
def setParallelism(parallelism: Int): ExecutionConfig
284
def getParallelism: Int
285
def enableClosureCleaner(): ExecutionConfig
286
def disableClosureCleaner(): ExecutionConfig
287
}
288
289
// Job execution result
290
class JobExecutionResult {
291
def getJobExecutionTime: Long
292
def getAccumulatorResult[T](accumulatorName: String): T
293
}
294
295
// Aggregation types
296
object Aggregations extends Enumeration {
297
val SUM, MAX, MIN = Value
298
}
299
300
// File system write modes
301
object FileSystem {
302
object WriteMode extends Enumeration {
303
val NO_OVERWRITE, OVERWRITE = Value
304
}
305
}
306
307
// Ordering for sorting
308
object Order extends Enumeration {
309
val ASCENDING, DESCENDING = Value
310
}
311
```
312
313
## Error Handling
314
315
The Flink Scala API can throw these exceptions:
316
317
- `IllegalArgumentException` - Invalid parameters or field names
318
- `UnsupportedOperationException` - Unsupported operations on certain data types
319
- `RuntimeException` - Runtime execution errors
320
- `IOException` - File I/O related errors
321
- `JobExecutionException` - Job execution failures