0
# Execution Environment
1
2
The ExecutionEnvironment is the entry point for all Flink Scala programs. It provides the context for creating DataSets and configuring execution parameters.
3
4
## Creating Execution Environments
5
6
```scala { .api }
7
object ExecutionEnvironment {
8
def getExecutionEnvironment: ExecutionEnvironment
9
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
10
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
11
def createCollectionsEnvironment: ExecutionEnvironment
12
def createLocalEnvironmentWithWebUI(config: Configuration): ExecutionEnvironment
13
}
14
```
15
16
### Usage Examples
17
18
```scala
19
import org.apache.flink.api.scala._
20
21
// Get default execution environment (local or cluster based on context)
22
val env = ExecutionEnvironment.getExecutionEnvironment
23
24
// Create local environment with specific parallelism
25
val localEnv = ExecutionEnvironment.createLocalEnvironment(4)
26
27
// Create remote environment
28
val remoteEnv = ExecutionEnvironment.createRemoteEnvironment(
29
host = "jobmanager",
30
port = 8081,
31
jarFiles = "myapp.jar"
32
)
33
34
// Create collections-based environment (for testing)
35
val collEnv = ExecutionEnvironment.createCollectionsEnvironment
36
```
37
38
## Configuration
39
40
```scala { .api }
41
class ExecutionEnvironment {
42
def setParallelism(parallelism: Int): Unit
43
def getParallelism: Int
44
def getConfig: ExecutionConfig
45
def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
46
def registerJobListener(jobListener: JobListener): Unit
47
def configure(configuration: Configuration, classLoader: ClassLoader): Unit
48
}
49
```
50
51
### Configuration Examples
52
53
```scala
54
val env = ExecutionEnvironment.getExecutionEnvironment
55
56
// Set parallelism
57
env.setParallelism(8)
58
59
// Configure execution settings
60
val config = env.getConfig
61
config.enableClosureCleaner()
62
config.setGlobalJobParameters(ParameterTool.fromArgs(args))
63
64
// Set restart strategy
65
import org.apache.flink.api.common.restartstrategy.RestartStrategies
66
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))
67
```
68
69
## Data Source Creation
70
71
### From Collections and Elements
72
73
```scala { .api }
74
class ExecutionEnvironment {
75
def fromElements[T: ClassTag: TypeInformation](data: T*): DataSet[T]
76
def fromCollection[T: ClassTag: TypeInformation](data: Iterable[T]): DataSet[T]
77
def fromCollection[T: ClassTag: TypeInformation](
78
data: Iterator[T],
79
tpe: TypeInformation[T]
80
): DataSet[T]
81
}
82
```
83
84
```scala
85
// From individual elements
86
val numbers = env.fromElements(1, 2, 3, 4, 5)
87
val words = env.fromElements("Hello", "World", "Flink")
88
89
// From collections
90
val list = List("A", "B", "C")
91
val dataSet = env.fromCollection(list)
92
93
// From iterator with explicit type information
94
val iterator = Iterator(("Alice", 25), ("Bob", 30))
95
val people = env.fromCollection(iterator, Types.TUPLE[(String, Int)])
96
```
97
98
### From Files
99
100
```scala { .api }
101
class ExecutionEnvironment {
102
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
103
def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
104
def readCsvFile[T: ClassTag: TypeInformation](
105
filePath: String,
106
lineDelimiter: String = "\n",
107
fieldDelimiter: String = ",",
108
ignoreFirstLine: Boolean = false,
109
lenient: Boolean = false,
110
includedFields: Array[Int] = null
111
): DataSet[T]
112
def readFileOfPrimitives[T: ClassTag: TypeInformation](
113
filePath: String,
114
delimiter: String,
115
tpe: Class[T]
116
): DataSet[T]
117
def readFile[T: ClassTag: TypeInformation](
118
inputFormat: FileInputFormat[T],
119
filePath: String
120
): DataSet[T]
121
def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
122
}
123
```
124
125
```scala
126
// Read text file
127
val lines = env.readTextFile("path/to/input.txt")
128
129
// Read CSV file as tuples
130
val csvData = env.readCsvFile[(String, Int, Double)](
131
filePath = "data.csv",
132
ignoreFirstLine = true,
133
fieldDelimiter = ","
134
)
135
136
// Read file of primitives (numbers, strings, etc.)
137
val numbers = env.readFileOfPrimitives("numbers.txt", "\n", classOf[Int])
138
val strings = env.readFileOfPrimitives("words.txt", " ", classOf[String])
139
140
// Read with custom file input format
141
val customData = env.readFile(new MyInputFormat(), "path/to/data")
142
143
// Create DataSet from generic input format
144
val inputFormat = new MyCustomInputFormat[MyType]()
145
val dataSet = env.createInput(inputFormat)
146
```
147
148
### Sequence Generation
149
150
```scala { .api }
151
class ExecutionEnvironment {
152
def generateSequence(from: Long, to: Long): DataSet[Long]
153
def fromParallelCollection[T: ClassTag: TypeInformation](
154
c: SplittableIterator[T]
155
): DataSet[T]
156
}
157
```
158
159
```scala
160
// Generate sequence of numbers
161
val sequence = env.generateSequence(1, 1000000)
162
163
// Create from parallel collection
164
val parallelData = env.fromParallelCollection(new NumberSequenceIterator(1L, 100L))
165
```
166
167
## Execution
168
169
```scala { .api }
170
class ExecutionEnvironment {
171
def execute(): JobExecutionResult
172
def execute(jobName: String): JobExecutionResult
173
def executeAsync(): JobClient
174
def executeAsync(jobName: String): JobClient
175
def getExecutionPlan: String
176
def getLastJobExecutionResult: JobExecutionResult
177
}
178
```
179
180
### Execution Examples
181
182
```scala
183
val env = ExecutionEnvironment.getExecutionEnvironment
184
185
// Create and transform data
186
val result = env.fromElements(1, 2, 3, 4, 5)
187
.map(_ * 2)
188
.filter(_ > 5)
189
190
// Print results (triggers execution)
191
result.print()
192
193
// Execute explicitly with job name
194
val jobResult = env.execute("My Flink Job")
195
println(s"Job took ${jobResult.getJobExecutionTime} ms")
196
197
// Execute asynchronously
198
val jobClient = env.executeAsync("Async Job")
199
val completableFuture = jobClient.getJobExecutionResult
200
```
201
202
## Advanced Configuration
203
204
### Kryo Serialization
205
206
```scala
207
val config = env.getConfig
208
209
// Register types with Kryo
210
config.registerKryoType(classOf[MyClass])
211
config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[MyClassSerializer])
212
213
// Add default Kryo serializers
214
config.addDefaultKryoSerializer(classOf[LocalDateTime], classOf[LocalDateTimeSerializer])
215
```
216
217
### Closure Cleaner
218
219
```scala
220
val config = env.getConfig
221
222
// Enable/disable closure cleaner (default: enabled)
223
config.enableClosureCleaner()
224
config.disableClosureCleaner()
225
```
226
227
### Cached Files
228
229
```scala { .api }
230
class ExecutionEnvironment {
231
def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit
232
}
233
```
234
235
```scala
236
// Register cached files for distributed access
237
env.registerCachedFile("hdfs://path/to/data.txt", "data")
238
env.registerCachedFile("s3://bucket/executable.sh", "script", executable = true)
239
240
// Access cached files in user functions via RuntimeContext
241
class MyMapFunction extends RichMapFunction[String, String] {
242
override def map(value: String): String = {
243
val cachedFile = getRuntimeContext.getDistributedCache.getFile("data")
244
// Use cached file...
245
value
246
}
247
}
248
```
249
250
### Global Job Parameters
251
252
```scala
253
import org.apache.flink.api.java.utils.ParameterTool
254
255
val params = ParameterTool.fromArgs(args)
256
env.getConfig.setGlobalJobParameters(params)
257
258
// Access in transformation functions
259
class MyMapFunction extends MapFunction[String, String] {
260
override def map(value: String): String = {
261
val params = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
262
.asInstanceOf[ParameterTool]
263
val prefix = params.get("prefix", "default")
264
s"$prefix: $value"
265
}
266
}
267
```
268
269
## Types
270
271
```scala { .api }
272
class ExecutionConfig {
273
def setParallelism(parallelism: Int): ExecutionConfig
274
def getParallelism: Int
275
def enableClosureCleaner(): ExecutionConfig
276
def disableClosureCleaner(): ExecutionConfig
277
def isClosureCleanerEnabled: Boolean
278
def setGlobalJobParameters(globalJobParameters: Configuration): ExecutionConfig
279
def getGlobalJobParameters: Configuration
280
def registerKryoType(tpe: Class[_]): ExecutionConfig
281
def registerTypeWithKryoSerializer[T](tpe: Class[T], serializer: Class[_ <: Serializer[T]]): ExecutionConfig
282
def addDefaultKryoSerializer[T](tpe: Class[T], serializer: Class[_ <: Serializer[T]]): ExecutionConfig
283
}
284
285
class JobExecutionResult {
286
def getJobExecutionTime: Long
287
def getAccumulatorResult[T](accumulatorName: String): T
288
def getAllAccumulatorResults: java.util.Map[String, Object]
289
}
290
291
trait JobClient {
292
def getJobExecutionResult: CompletableFuture[JobExecutionResult]
293
def getJobStatus: CompletableFuture[JobStatus]
294
def cancel(): CompletableFuture[Void]
295
def stopWithSavepoint(advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String]
296
}
297
```