0
# Apache Flink Scala API
1
2
Apache Flink Scala API provides elegant and fluent Scala language bindings for Flink's distributed stream and batch processing framework. This module enables Scala developers to write type-safe data processing applications using idiomatic Scala constructs, including case classes, pattern matching, and functional programming patterns.
3
4
## Package Information
5
6
- **Package Name**: flink-scala_2.11
7
- **Package Type**: maven
8
- **Language**: Scala
9
- **Installation**: Add to your Maven pom.xml:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-scala_2.11</artifactId>
14
<version>1.14.6</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```scala
21
import org.apache.flink.api.scala._
22
import org.apache.flink.api.scala.ExecutionEnvironment
23
```
24
25
For type utilities:
26
```scala
27
import org.apache.flink.api.scala.typeutils.Types
28
```
29
30
For extension methods (partial function support):
31
```scala
32
import org.apache.flink.api.scala.extensions._
33
```
34
35
## Basic Usage
36
37
```scala
38
import org.apache.flink.api.scala._
39
40
// Create execution environment
41
val env = ExecutionEnvironment.getExecutionEnvironment
42
43
// Create DataSet from collection
44
val data = env.fromCollection(List(1, 2, 3, 4, 5))
45
46
// Transform data
47
val result = data
48
.filter(_ > 2)
49
.map(_ * 2)
50
.reduce(_ + _)
51
52
// Execute and get result
53
println(result.collect().head) // Prints: 18
54
```
55
56
## Architecture
57
58
The Flink Scala API is built around several key components:
59
60
- **ExecutionEnvironment**: Entry point for creating and configuring Flink programs
61
- **DataSet[T]**: Core abstraction representing distributed collections with type safety
62
- **Type System**: Automatic type information generation via Scala macros
63
- **Serialization Framework**: Specialized serializers for Scala types (Option, Either, Try, case classes)
64
- **Fluent API**: Method chaining for building complex data processing pipelines
65
- **Extensions**: Partial function support for pattern matching in transformations
66
67
## Capabilities
68
69
### Execution Environment
70
71
Environment setup, data source creation, and job execution management. The primary entry point for all Flink programs.
72
73
```scala { .api }
74
object ExecutionEnvironment {
75
def getExecutionEnvironment: ExecutionEnvironment
76
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
77
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
78
}
79
80
class ExecutionEnvironment {
81
def setParallelism(parallelism: Int): Unit
82
def getParallelism: Int
83
def fromCollection[T: TypeInformation: ClassTag](data: Iterable[T]): DataSet[T]
84
def fromElements[T: TypeInformation: ClassTag](data: T*): DataSet[T]
85
def readTextFile(filePath: String): DataSet[String]
86
def execute(): JobExecutionResult
87
def execute(jobName: String): JobExecutionResult
88
}
89
```
90
91
[Execution Environment](./execution-environment.md)
92
93
### Data Transformations
94
95
Core data processing operations including map, filter, reduce, and aggregations. The heart of Flink's data processing capabilities.
96
97
```scala { .api }
98
class DataSet[T] {
99
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
100
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
101
def filter(fun: T => Boolean): DataSet[T]
102
def reduce(fun: (T, T) => T): DataSet[T]
103
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
104
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
105
def union(other: DataSet[T]): DataSet[T]
106
}
107
```
108
109
[Data Transformations](./data-transformations.md)
110
111
### Grouping and Aggregation
112
113
Group-wise operations and aggregation functions for summarizing and analyzing grouped data.
114
115
```scala { .api }
116
class GroupedDataSet[T] {
117
def reduce(fun: (T, T) => T): DataSet[T]
118
def reduceGroup[R: TypeInformation: ClassTag](fun: Iterator[T] => R): DataSet[R]
119
def sum(field: String): AggregateDataSet[T]
120
def max(field: String): AggregateDataSet[T]
121
def min(field: String): AggregateDataSet[T]
122
def aggregate(agg: Aggregations, field: String): AggregateDataSet[T]
123
}
124
```
125
126
[Grouping and Aggregation](./grouping-aggregation.md)
127
128
### Binary Operations
129
130
Join, cross, and coGroup operations for combining multiple DataSets.
131
132
```scala { .api }
133
class DataSet[T] {
134
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]
135
def leftOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
136
def rightOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
137
def fullOuterJoin[O](other: DataSet[O]): UnfinishedOuterJoinOperation[T, O]
138
def cross[O](other: DataSet[O]): CrossDataSet[T, O]
139
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
140
}
141
```
142
143
[Binary Operations](./binary-operations.md)
144
145
### Type System and Serialization
146
147
Comprehensive type information system and Scala-specific serialization support.
148
149
```scala { .api }
150
object Types {
151
def of[T: TypeInformation]: TypeInformation[T]
152
def OPTION[A](valueType: TypeInformation[A]): TypeInformation[Option[A]]
153
def EITHER[A, B](leftType: TypeInformation[A], rightType: TypeInformation[B]): TypeInformation[Either[A, B]]
154
def TRY[A](valueType: TypeInformation[A]): TypeInformation[Try[A]]
155
def CASE_CLASS[T: TypeInformation]: TypeInformation[T]
156
}
157
158
// Automatic type information generation via macro
159
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
160
```
161
162
[Type System](./type-system.md)
163
164
### Partitioning and Distribution
165
166
Control over data distribution and partitioning strategies across the cluster.
167
168
```scala { .api }
169
class DataSet[T] {
170
def partitionByHash[K: TypeInformation](fun: T => K): DataSet[T]
171
def partitionByRange[K: TypeInformation](fun: T => K): DataSet[T]
172
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataSet[T]
173
def rebalance(): DataSet[T]
174
def sortPartition[K: TypeInformation](fun: T => K, order: Order): DataSet[T]
175
}
176
```
177
178
[Partitioning and Distribution](./partitioning-distribution.md)
179
180
### Input and Output Operations
181
182
Reading data from various sources and writing results to different sinks.
183
184
```scala { .api }
185
class ExecutionEnvironment {
186
def readTextFile(filePath: String): DataSet[String]
187
def readCsvFile[T: ClassTag: TypeInformation](filePath: String): DataSet[T]
188
def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
189
}
190
191
class DataSet[T] {
192
def writeAsText(filePath: String, writeMode: FileSystem.WriteMode = null): DataSink[T]
193
def writeAsCsv(filePath: String): DataSink[T]
194
def print(): Unit
195
def collect(): Seq[T]
196
}
197
```
198
199
[Input and Output](./input-output.md)
200
201
### Utility Functions
202
203
Advanced utilities for sampling, indexing, and data analysis.
204
205
```scala { .api }
206
implicit class DataSetUtils[T](dataSet: DataSet[T]) {
207
def zipWithIndex: DataSet[(Long, T)]
208
def zipWithUniqueId: DataSet[(Long, T)]
209
def sample(withReplacement: Boolean, fraction: Double): DataSet[T]
210
def countElementsPerPartition: DataSet[(Int, Long)]
211
}
212
```
213
214
[Utilities](./utilities.md)
215
216
## Types
217
218
### Core Types
219
220
```scala { .api }
221
trait TypeInformation[T] {
222
def getTypeClass: Class[T]
223
def createSerializer(config: ExecutionConfig): TypeSerializer[T]
224
}
225
226
class JobExecutionResult {
227
def getJobID: JobID
228
def getNetRuntime: Long
229
def getNetRuntime(timeUnit: TimeUnit): Long
230
}
231
232
sealed trait Order
233
object Order {
234
case object ASCENDING extends Order
235
case object DESCENDING extends Order
236
}
237
238
abstract class Partitioner[T] {
239
def partition(key: T, numPartitions: Int): Int
240
}
241
```