Apache Flink Scala API providing type-safe distributed stream and batch processing with idiomatic Scala constructs, functional programming features, and seamless runtime integration.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-scala_2-10@1.3.00
# Apache Flink Scala API
1
2
Apache Flink Scala API provides type-safe distributed stream and batch processing with idiomatic Scala constructs, functional programming features, and seamless runtime integration. It enables Scala developers to write data processing applications using Flink's powerful streaming and batch processing capabilities with native Scala types, case classes, pattern matching, and functional transformations.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-scala_2.10
7
- **Package Type**: maven
8
- **Language**: Scala (2.10)
9
- **Version**: 1.3.3
10
- **Installation**: Add to your `pom.xml`:
11
```xml
12
<dependency>
13
<groupId>org.apache.flink</groupId>
14
<artifactId>flink-scala_2.10</artifactId>
15
<version>1.3.3</version>
16
</dependency>
17
```
18
19
## Core Imports
20
21
```scala
22
import org.apache.flink.api.scala._
23
import org.apache.flink.api.scala.{ExecutionEnvironment, DataSet}
24
```
25
26
## Basic Usage
27
28
```scala
29
import org.apache.flink.api.scala._
30
31
// Create execution environment
32
val env = ExecutionEnvironment.getExecutionEnvironment
33
34
// Create a DataSet from a collection
35
val data = env.fromElements(1, 2, 3, 4, 5)
36
37
// Transform the data
38
val result = data
39
.map(_ * 2)
40
.filter(_ > 4)
41
42
// Execute and collect results
43
result.print()
44
```
45
46
## Architecture
47
48
The Flink Scala API is built around several core abstractions:
49
50
- **ExecutionEnvironment**: Entry point for creating and executing Flink batch programs
51
- **DataSet[T]**: Immutable distributed collection with type-safe transformations
52
- **GroupedDataSet[T]**: DataSet grouped by key for aggregation operations
53
- **Type System**: Comprehensive TypeInformation system for Scala types including case classes, Option, Either, Try
54
55
## Capabilities
56
57
### Execution Environment
58
59
The ExecutionEnvironment is the main entry point for Flink batch programs, providing data source creation and execution control.
60
61
```scala { .api }
62
object ExecutionEnvironment {
63
def getExecutionEnvironment: ExecutionEnvironment
64
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
65
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
66
}
67
68
class ExecutionEnvironment {
69
def setParallelism(parallelism: Int): Unit
70
def getParallelism: Int
71
def execute(): JobExecutionResult
72
def execute(jobName: String): JobExecutionResult
73
}
74
```
75
76
[Execution Environment](./execution-environment.md)
77
78
### Data Sources and Sinks
79
80
Create DataSets from various sources and write results to different output formats.
81
82
```scala { .api }
83
class ExecutionEnvironment {
84
def fromCollection[T: ClassTag : TypeInformation](data: Iterable[T]): DataSet[T]
85
def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T]
86
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
87
def generateSequence(from: Long, to: Long): DataSet[Long]
88
}
89
90
class DataSet[T] {
91
def writeAsText(filePath: String): DataSink[T]
92
def print(): DataSink[T]
93
def collect(): Seq[T]
94
}
95
```
96
97
[Data Sources and Sinks](./data-sources-sinks.md)
98
99
### Transformations
100
101
Core transformation operations for processing data with functional programming patterns.
102
103
```scala { .api }
104
class DataSet[T] {
105
def map[R: TypeInformation: ClassTag](fun: T => R): DataSet[R]
106
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataSet[R]
107
def filter(fun: T => Boolean): DataSet[T]
108
def reduce(fun: (T, T) => T): DataSet[T]
109
def distinct(): DataSet[T]
110
def union(other: DataSet[T]*): DataSet[T]
111
}
112
```
113
114
[Transformations](./transformations.md)
115
116
### Grouping and Aggregation
117
118
Group data by keys and perform aggregation operations with type-safe field access.
119
120
```scala { .api }
121
class DataSet[T] {
122
def groupBy[K: TypeInformation](fun: T => K): GroupedDataSet[T]
123
def groupBy(fields: Int*): GroupedDataSet[T]
124
def groupBy(firstField: String, otherFields: String*): GroupedDataSet[T]
125
}
126
127
class GroupedDataSet[T] {
128
def reduce(fun: (T, T) => T): DataSet[T]
129
def sum(field: Int): DataSet[T]
130
def max(field: Int): DataSet[T]
131
def min(field: Int): DataSet[T]
132
def aggregate(agg: Aggregations, field: Int): AggregateDataSet[T]
133
}
134
```
135
136
[Grouping and Aggregation](./grouping-aggregation.md)
137
138
### Joins and CoGroups
139
140
Combine multiple DataSets using joins, co-groups, and cross products with flexible key selection.
141
142
```scala { .api }
143
class DataSet[T] {
144
def join[O: ClassTag](other: DataSet[O]): UnfinishedJoinOperation[T, O]
145
def coGroup[O: ClassTag](other: DataSet[O]): UnfinishedCoGroupOperation[T, O]
146
def cross[O: ClassTag](other: DataSet[O]): CrossDataSet[T, O]
147
}
148
149
class UnfinishedJoinOperation[T, O] {
150
def where[K: TypeInformation](keySelector: T => K): UnfinishedJoinOperationWhere[T, O]
151
def where(fields: Int*): UnfinishedJoinOperationWhere[T, O]
152
}
153
```
154
155
[Joins and CoGroups](./joins-cogroups.md)
156
157
### Iterations
158
159
Support for iterative algorithms with both bulk iteration and delta iteration patterns.
160
161
```scala { .api }
162
class DataSet[T] {
163
def iterate(maxIterations: Int)(stepFunction: DataSet[T] => DataSet[T]): DataSet[T]
164
def iterateWithTermination(maxIterations: Int)(stepFunction: DataSet[T] => (DataSet[T], DataSet[_])): DataSet[T]
165
def iterateDelta[R: TypeInformation: ClassTag](
166
workset: DataSet[R],
167
maxIterations: Int,
168
keyFields: Array[String]
169
)(stepFunction: (DataSet[T], DataSet[R]) => (DataSet[T], DataSet[R])): DataSet[T]
170
}
171
```
172
173
[Iterations](./iterations.md)
174
175
### Type System and Serialization
176
177
Comprehensive type information system supporting Scala types with macro-based code generation.
178
179
```scala { .api }
180
// Implicit type information generation
181
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
182
183
// Core type information classes
184
class CaseClassTypeInfo[T] extends TypeInformation[T]
185
class OptionTypeInfo[T] extends TypeInformation[Option[T]]
186
class EitherTypeInfo[A, B] extends TypeInformation[Either[A, B]]
187
class TryTypeInfo[T] extends TypeInformation[Try[T]]
188
```
189
190
[Type System](./type-system.md)
191
192
### Hadoop Integration
193
194
Native integration with Hadoop MapReduce and MapRed input/output formats.
195
196
```scala { .api }
197
class ExecutionEnvironment {
198
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
199
inputFormat: MapreduceInputFormat[K, V],
200
keyClass: Class[K],
201
valueClass: Class[V],
202
inputPath: String
203
): DataSet[(K, V)]
204
}
205
```
206
207
[Hadoop Integration](./hadoop-integration.md)
208
209
## Types
210
211
```scala { .api }
212
// Core execution types
213
class ExecutionEnvironment
214
class DataSet[T]
215
class GroupedDataSet[T]
216
class CrossDataSet[T, O]
217
class AggregateDataSet[T]
218
class CoGroupDataSet[T, O]
219
220
// Configuration and results
221
class ExecutionConfig
222
class JobExecutionResult
223
class JobID
224
225
// Resource management
226
class ResourceSpec
227
228
// Type information
229
abstract class TypeInformation[T]
230
class CaseClassTypeInfo[T] extends TypeInformation[T]
231
class OptionTypeInfo[T] extends TypeInformation[Option[T]]
232
class EitherTypeInfo[A, B] extends TypeInformation[Either[A, B]]
233
234
// Serialization
235
abstract class TypeSerializer[T]
236
class CaseClassSerializer[T] extends TypeSerializer[T]
237
238
// Operators and functions
239
trait MapFunction[T, O]
240
trait FlatMapFunction[T, O]
241
trait FilterFunction[T]
242
trait ReduceFunction[T]
243
trait GroupReduceFunction[T, O]
244
trait JoinFunction[T, O, R]
245
trait CoGroupFunction[T, O, R]
246
trait CrossFunction[T, O, R]
247
248
// Aggregation types
249
class Aggregations
250
class Order
251
252
// Join operations
253
class UnfinishedJoinOperation[T, O]
254
class UnfinishedJoinOperationWhere[T, O]
255
class UnfinishedJoinOperationWhereEqual[T, O]
256
class JoinDataSet[T, O]
257
258
// Output types
259
class DataSink[T]
260
trait OutputFormat[T]
261
class TextOutputFormat[T] extends OutputFormat[T]
262
263
// Input types
264
trait InputFormat[T, S]
265
class TextInputFormat extends InputFormat[String, FileInputSplit]
266
267
// Utilities
268
object DataSetUtils
269
class ScalaGauge[T]
270
271
// Accumulator types
272
trait Accumulator[V, R]
273
class IntCounter extends Accumulator[Int, Int]
274
class LongCounter extends Accumulator[Long, Long]
275
class DoubleCounter extends Accumulator[Double, Double]
276
class Histogram extends Accumulator[Int, java.util.Map[Int, Int]]
277
class ListAccumulator[T] extends Accumulator[T, java.util.ArrayList[T]]
278
class IntMaximum extends Accumulator[Int, Int]
279
class IntMinimum extends Accumulator[Int, Int]
280
class DoubleMaximum extends Accumulator[Double, Double]
281
class DoubleMinimum extends Accumulator[Double, Double]
282
283
// Rich functions
284
trait RichFunction
285
abstract class RichMapFunction[T, O] extends MapFunction[T, O] with RichFunction
286
abstract class RichFlatMapFunction[T, O] extends FlatMapFunction[T, O] with RichFunction
287
abstract class RichFilterFunction[T] extends FilterFunction[T] with RichFunction
288
abstract class RichReduceFunction[T] extends ReduceFunction[T] with RichFunction
289
abstract class RichGroupReduceFunction[T, O] extends GroupReduceFunction[T, O] with RichFunction
290
abstract class RichMapPartitionFunction[T, O] extends MapPartitionFunction[T, O] with RichFunction
291
292
// Runtime context
293
trait RuntimeContext
294
trait BroadcastVariableInitializer[T, C]
295
296
// Configuration
297
class Configuration
298
```