0
# Execution Environment
1
2
The ExecutionEnvironment is the primary entry point for Flink programs, providing the context in which jobs are executed and methods to create DataSets from various sources.
3
4
## Capabilities
5
6
### Environment Creation
7
8
Factory methods for creating execution environments with different configurations.
9
10
```scala { .api }
11
object ExecutionEnvironment {
12
/**
13
* Creates an execution environment based on context (local or remote)
14
* @return ExecutionEnvironment instance
15
*/
16
def getExecutionEnvironment: ExecutionEnvironment
17
18
/**
19
* Creates a local execution environment
20
* @param parallelism Degree of parallelism (default: number of CPU cores)
21
* @return Local ExecutionEnvironment
22
*/
23
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
24
25
/**
26
* Creates a local environment with web UI
27
* @param config Optional configuration
28
* @return Local ExecutionEnvironment with web interface
29
*/
30
def createLocalEnvironmentWithWebUI(config: Configuration = null): ExecutionEnvironment
31
32
/**
33
* Creates environment for remote cluster execution
34
* @param host Cluster host address
35
* @param port Cluster port
36
* @param jarFiles JAR files to distribute to cluster
37
* @return Remote ExecutionEnvironment
38
*/
39
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
40
41
/**
42
* Creates environment for testing with collections
43
* @return Collections-based ExecutionEnvironment
44
*/
45
def createCollectionsEnvironment: ExecutionEnvironment
46
}
47
```
48
49
**Usage Examples:**
50
51
```scala
52
import org.apache.flink.api.scala._
53
54
// Context-aware environment (local or remote based on context)
55
val env = ExecutionEnvironment.getExecutionEnvironment
56
57
// Local environment with specific parallelism
58
val localEnv = ExecutionEnvironment.createLocalEnvironment(4)
59
60
// Remote cluster environment
61
val remoteEnv = ExecutionEnvironment.createRemoteEnvironment(
62
"cluster-host", 6123, "my-job.jar"
63
)
64
```
65
66
### Environment Configuration
67
68
Methods for configuring execution parameters and behavior.
69
70
```scala { .api }
71
class ExecutionEnvironment {
72
/**
73
* Sets the parallelism for operations in this environment
74
* @param parallelism Degree of parallelism
75
*/
76
def setParallelism(parallelism: Int): Unit
77
78
/**
79
* Gets the current parallelism setting
80
* @return Current parallelism level
81
*/
82
def getParallelism: Int
83
84
/**
85
* Gets the execution configuration object
86
* @return ExecutionConfig for fine-tuning behavior
87
*/
88
def getConfig: ExecutionConfig
89
90
/**
91
* Configures the environment with settings
92
* @param configuration Configuration object
93
* @param classLoader Class loader for user code
94
*/
95
def configure(configuration: ReadableConfig, classLoader: ClassLoader): Unit
96
97
/**
98
* Sets the buffer timeout for network transfers
99
* @param timeoutMillis Timeout in milliseconds
100
*/
101
def setBufferTimeout(timeoutMillis: Long): Unit
102
103
/**
104
* Gets the current buffer timeout setting
105
* @return Current buffer timeout in milliseconds
106
*/
107
def getBufferTimeout: Long
108
109
/**
110
* Enables or disables object reuse mode for better performance
111
* @param objectReuse Whether to enable object reuse
112
*/
113
def setObjectReuse(objectReuse: Boolean): Unit
114
115
/**
116
* Gets the current object reuse setting
117
* @return True if object reuse is enabled
118
*/
119
def getObjectReuse: Boolean
120
121
/**
122
* Sets the default maximum degree of parallelism
123
* @param maxParallelism Maximum parallelism level
124
*/
125
def setMaxParallelism(maxParallelism: Int): Unit
126
127
/**
128
* Gets the default maximum degree of parallelism
129
* @return Maximum parallelism level
130
*/
131
def getMaxParallelism: Int
132
133
/**
134
* Configures the number of task slots per TaskManager
135
* @param numberOfTaskSlots Number of slots
136
*/
137
def setNumberOfExecutionRetries(numberOfTaskSlots: Int): Unit
138
139
/**
140
* Gets the number of execution retries
141
* @return Number of retries configured
142
*/
143
def getNumberOfExecutionRetries: Int
144
}
145
```
146
147
### Restart Strategy Configuration
148
149
Configure job restart behavior for fault tolerance.
150
151
```scala { .api }
152
class ExecutionEnvironment {
153
/**
154
* Sets the restart strategy for job failures
155
* @param restartStrategyConfiguration Restart strategy configuration
156
*/
157
def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
158
159
/**
160
* Gets the current restart strategy
161
* @return Current restart strategy configuration
162
*/
163
def getRestartStrategy: RestartStrategyConfiguration
164
}
165
```
166
167
### Data Source Creation
168
169
Create DataSets from various data sources including files, collections, and custom formats.
170
171
```scala { .api }
172
class ExecutionEnvironment {
173
/**
174
* Creates DataSet from an iterable collection
175
* @param data Iterable collection of elements
176
* @return DataSet containing the collection elements
177
*/
178
def fromCollection[T: TypeInformation: ClassTag](data: Iterable[T]): DataSet[T]
179
180
/**
181
* Creates DataSet from individual elements
182
* @param data Variable arguments of elements
183
* @return DataSet containing the elements
184
*/
185
def fromElements[T: TypeInformation: ClassTag](data: T*): DataSet[T]
186
187
/**
188
* Creates DataSet from a parallel collection
189
* @param iterator Splittable iterator for parallel processing
190
* @return DataSet from parallel collection
191
*/
192
def fromParallelCollection[T: TypeInformation: ClassTag](iterator: SplittableIterator[T]): DataSet[T]
193
194
/**
195
* Generates a sequence of numbers
196
* @param from Starting number (inclusive)
197
* @param to Ending number (inclusive)
198
* @return DataSet containing the number sequence
199
*/
200
def generateSequence(from: Long, to: Long): DataSet[Long]
201
}
202
```
203
204
### File Input Operations
205
206
Read data from various file formats and sources.
207
208
```scala { .api }
209
class ExecutionEnvironment {
210
/**
211
* Reads a text file as DataSet of strings
212
* @param filePath Path to the text file
213
* @param charsetName Character encoding (default: UTF-8)
214
* @return DataSet of text lines
215
*/
216
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
217
218
/**
219
* Reads a text file with default value for empty files
220
* @param filePath Path to the text file
221
* @param defaultValue Default value if file is empty
222
* @param charsetName Character encoding (default: UTF-8)
223
* @return DataSet of text lines with fallback
224
*/
225
def readTextFileWithValue(filePath: String, defaultValue: String, charsetName: String = "UTF-8"): DataSet[String]
226
227
/**
228
* Reads a text file with collection fallback for empty files
229
* @param filePath Path to the text file
230
* @param defaultValues Collection of default values if file is empty
231
* @param charsetName Character encoding (default: UTF-8)
232
* @return DataSet of text lines with collection fallback
233
*/
234
def readTextFileWithValue(filePath: String, defaultValues: Iterable[String], charsetName: String = "UTF-8"): DataSet[String]
235
236
/**
237
* Reads a CSV file into typed DataSet
238
* @param filePath Path to the CSV file
239
* @return DataSet of parsed CSV records
240
*/
241
def readCsvFile[T: ClassTag: TypeInformation](filePath: String): DataSet[T]
242
243
/**
244
* Reads primitive values from a file
245
* @param filePath Path to the file
246
* @param delimiter Value delimiter
247
* @return DataSet of primitive values
248
*/
249
def readFileOfPrimitives[T: ClassTag: TypeInformation](filePath: String, delimiter: String = "\n"): DataSet[T]
250
251
/**
252
* Reads primitive values from a file with default value fallback
253
* @param filePath Path to the file
254
* @param defaultValue Default value if file is empty
255
* @param delimiter Value delimiter
256
* @return DataSet of primitive values with fallback
257
*/
258
def readFileOfPrimitives[T: ClassTag: TypeInformation](filePath: String, defaultValue: T, delimiter: String = "\n"): DataSet[T]
259
260
/**
261
* Reads primitive values from a file with collection fallback
262
* @param filePath Path to the file
263
* @param defaultValues Collection of default values if file is empty
264
* @param delimiter Value delimiter
265
* @return DataSet of primitive values with collection fallback
266
*/
267
def readFileOfPrimitives[T: ClassTag: TypeInformation](filePath: String, defaultValues: Iterable[T], delimiter: String = "\n"): DataSet[T]
268
269
/**
270
* Reads Hadoop SequenceFile format
271
* @param keyClass Class type for keys
272
* @param valueClass Class type for values
273
* @param filePath Path to the sequence file
274
* @return DataSet of key-value pairs
275
*/
276
def readSequenceFile[K: ClassTag: TypeInformation, V: ClassTag: TypeInformation](
277
keyClass: Class[K],
278
valueClass: Class[V],
279
filePath: String
280
): DataSet[(K, V)]
281
282
/**
283
* Reads Hadoop SequenceFile with Writables
284
* @param keyClass Writable key class
285
* @param valueClass Writable value class
286
* @param filePath Path to the sequence file
287
* @return DataSet of Writable key-value pairs
288
*/
289
def readHadoopFile[K <: Writable: ClassTag: TypeInformation, V <: Writable: ClassTag: TypeInformation](
290
keyClass: Class[K],
291
valueClass: Class[V],
292
filePath: String
293
): DataSet[(K, V)]
294
295
/**
296
* Reads file using custom input format
297
* @param inputFormat Custom file input format
298
* @param filePath Path to the file
299
* @return DataSet with custom format parsing
300
*/
301
def readFile[T: ClassTag: TypeInformation](inputFormat: FileInputFormat[T], filePath: String): DataSet[T]
302
303
/**
304
* Creates DataSet from custom input format
305
* @param inputFormat Custom input format implementation
306
* @return DataSet using custom input
307
*/
308
def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
309
310
/**
311
* Creates DataSet from custom input format with generic parameters
312
* @param inputFormat Custom input format with type parameters
313
* @param typeInformation Type information for result type
314
* @return DataSet using custom input with type safety
315
*/
316
def createInput[T](inputFormat: InputFormat[T, _ <: InputSplit])(implicit typeInfo: TypeInformation[T]): DataSet[T]
317
318
/**
319
* Creates DataSet from Hadoop input format
320
* @param hadoopInputFormat Hadoop InputFormat class
321
* @param keyClass Key type class
322
* @param valueClass Value type class
323
* @param job Hadoop job configuration
324
* @return DataSet from Hadoop source
325
*/
326
def createHadoopInput[K, V](
327
hadoopInputFormat: HadoopInputFormat[K, V],
328
keyClass: Class[K],
329
valueClass: Class[V],
330
job: Job
331
): DataSet[(K, V)]
332
}
333
```
334
335
### Serialization Configuration
336
337
Configure Kryo serialization and type registration for custom types.
338
339
```scala { .api }
340
class ExecutionEnvironment {
341
/**
342
* Registers a type with a Kryo serializer instance
343
* @param clazz Class to register
344
* @param serializer Serializer instance
345
*/
346
def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
347
clazz: Class[_],
348
serializer: T
349
): Unit
350
351
/**
352
* Registers a type with a Kryo serializer class
353
* @param clazz Class to register
354
* @param serializer Serializer class
355
*/
356
def registerTypeWithKryoSerializer(
357
clazz: Class[_],
358
serializer: Class[_ <: Serializer[_]]
359
): Unit
360
361
/**
362
* Adds a default Kryo serializer for a type
363
* @param clazz Class to register
364
* @param serializer Serializer class
365
*/
366
def addDefaultKryoSerializer(
367
clazz: Class[_],
368
serializer: Class[_ <: Serializer[_]]
369
): Unit
370
371
/**
372
* Adds a default Kryo serializer instance for a type
373
* @param clazz Class to register
374
* @param serializer Serializer instance
375
*/
376
def addDefaultKryoSerializer[T <: Serializer[_] with Serializable](
377
clazz: Class[_],
378
serializer: T
379
): Unit
380
381
/**
382
* Registers a type with Kryo (uses default serialization)
383
* @param typeClass Class to register
384
*/
385
def registerType(typeClass: Class[_]): Unit
386
387
/**
388
* Registers multiple types with Kryo
389
* @param types Classes to register
390
*/
391
def registerType(types: Class[_]*): Unit
392
393
/**
394
* Enables/disables force Kryo serialization for all types
395
* @param forceKryo Whether to force Kryo for all serialization
396
*/
397
def setForceKryo(forceKryo: Boolean): Unit
398
399
/**
400
* Enables/disables force Avro serialization for generic types
401
* @param forceAvro Whether to force Avro serialization
402
*/
403
def setForceAvro(forceAvro: Boolean): Unit
404
405
/**
406
* Disables auto type registration with Kryo
407
*/
408
def disableAutoTypeRegistration(): Unit
409
410
/**
411
* Enables auto type registration with Kryo
412
*/
413
def enableAutoTypeRegistration(): Unit
414
415
/**
416
* Gets whether auto type registration is enabled
417
* @return True if auto registration is enabled
418
*/
419
def hasAutoTypeRegistrationEnabled: Boolean
420
}
421
```
422
423
### Job Execution
424
425
Execute jobs and retrieve results, with support for both synchronous and asynchronous execution.
426
427
```scala { .api }
428
class ExecutionEnvironment {
429
/**
430
* Executes the job and waits for completion
431
* @return JobExecutionResult with execution statistics
432
*/
433
def execute(): JobExecutionResult
434
435
/**
436
* Executes the job with a custom name
437
* @param jobName Name for the job
438
* @return JobExecutionResult with execution statistics
439
*/
440
def execute(jobName: String): JobExecutionResult
441
442
/**
443
* Executes the job asynchronously
444
* @return JobClient for monitoring execution
445
*/
446
def executeAsync(): JobClient
447
448
/**
449
* Executes the job asynchronously with a custom name
450
* @param jobName Name for the job
451
* @return JobClient for monitoring execution
452
*/
453
def executeAsync(jobName: String): JobClient
454
455
/**
456
* Gets the result of the last job execution
457
* @return JobExecutionResult from last execution
458
*/
459
def getLastJobExecutionResult: JobExecutionResult
460
}
461
```
462
463
### Job Listeners and Monitoring
464
465
Register listeners for job lifecycle events.
466
467
```scala { .api }
468
class ExecutionEnvironment {
469
/**
470
* Registers a job listener for execution events
471
* @param jobListener Listener for job events
472
*/
473
def registerJobListener(jobListener: JobListener): Unit
474
475
/**
476
* Clears all registered job listeners
477
*/
478
def clearJobListeners(): Unit
479
}
480
```
481
482
### Program Planning
483
484
Access to execution plan generation and optimization details.
485
486
```scala { .api }
487
class ExecutionEnvironment {
488
/**
489
* Gets the execution plan as JSON string
490
* @return Execution plan in JSON format
491
*/
492
def getExecutionPlan(): String
493
494
/**
495
* Creates a program plan for optimization
496
* @param jobName Optional job name
497
* @return Program plan object
498
*/
499
def createProgramPlan(jobName: String = ""): Plan
500
}
501
```
502
503
### Distributed Cache
504
505
Register files for distribution to all cluster nodes.
506
507
```scala { .api }
508
class ExecutionEnvironment {
509
/**
510
* Registers a file in the distributed cache
511
* @param filePath Path to the file
512
* @param name Name for accessing the cached file
513
* @param executable Whether the file should be executable
514
*/
515
def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit
516
}
517
```
518
519
### Utility Methods
520
521
Additional utility methods for DataSet operations.
522
523
```scala { .api }
524
class ExecutionEnvironment {
525
/**
526
* Creates union of multiple DataSets
527
* @param sets Sequence of DataSets to union
528
* @return Unified DataSet
529
*/
530
def union[T](sets: Seq[DataSet[T]]): DataSet[T]
531
}
532
```
533
534
## Types
535
536
```scala { .api }
537
trait RestartStrategyConfiguration
538
539
class JobExecutionResult {
540
def getJobID: JobID
541
def getNetRuntime: Long
542
def getNetRuntime(timeUnit: TimeUnit): Long
543
def getAllAccumulatorResults: java.util.Map[String, Object]
544
def getAccumulatorResult[T](accumulatorName: String): T
545
}
546
547
trait JobClient {
548
def getJobID: JobID
549
def cancel(): CompletableFuture[Void]
550
def stopWithSavepoint(advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String]
551
}
552
553
trait JobListener {
554
def onJobSubmitted(jobClient: JobClient, t: Throwable): Unit
555
def onJobExecuted(jobExecutionResult: JobExecutionResult, t: Throwable): Unit
556
}
557
558
abstract class Plan {
559
def getJobName: String
560
def setJobName(jobName: String): Unit
561
}
562
```