0
# Execution Environment
1
2
The ExecutionEnvironment is the main entry point for Apache Flink batch programs. It provides methods to create DataSets from various sources, control job execution parameters, and submit jobs for execution.
3
4
## Creating Execution Environments
5
6
```scala { .api }
7
object ExecutionEnvironment {
8
// Get environment based on execution context (local or cluster)
9
def getExecutionEnvironment: ExecutionEnvironment
10
11
// Create local execution environment
12
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
13
14
// Create environment for testing with collections
15
def createCollectionsEnvironment: ExecutionEnvironment
16
17
// Create remote execution environment
18
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
19
def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*): ExecutionEnvironment
20
}
21
```
22
23
## Configuration Methods
24
25
```scala { .api }
26
class ExecutionEnvironment {
27
// Get the underlying Java execution environment
28
def getJavaEnv: JavaEnv
29
30
// Get execution configuration
31
def getConfig: ExecutionConfig
32
33
// Set default parallelism for all operations
34
def setParallelism(parallelism: Int): Unit
35
def getParallelism: Int
36
37
// Configure restart strategy
38
def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
39
def getRestartStrategy: RestartStrategyConfiguration
40
41
// Set number of execution retries (deprecated)
42
def setNumberOfExecutionRetries(numRetries: Int): Unit
43
def getNumberOfExecutionRetries: Int
44
45
// Kryo serialization registration
46
def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
47
clazz: Class[_],
48
serializer: T
49
): Unit
50
def registerTypeWithKryoSerializer(
51
clazz: Class[_],
52
serializer: Class[_ <: Serializer[_]]
53
): Unit
54
def registerType(typeClass: Class[_]): Unit
55
56
// Cached file registration
57
def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit
58
59
// Default parallelism for local execution
60
def setDefaultLocalParallelism(parallelism: Int): Unit
61
}
62
```
63
64
## Job Management
65
66
```scala { .api }
67
class ExecutionEnvironment {
68
// Get unique job identifier
69
def getId: JobID
70
def getIdString: String
71
72
// Session management
73
def startNewSession(): Unit
74
def setSessionTimeout(timeout: Long): Unit
75
def getSessionTimeout: Long
76
77
// Execution results
78
def getLastJobExecutionResult: JobExecutionResult
79
80
// Default parallelism for local execution
81
def setDefaultLocalParallelism(parallelism: Int): Unit
82
83
// Buffer timeout configuration
84
def setBufferTimeout(timeoutMillis: Long): ExecutionEnvironment
85
def getBufferTimeout: Long
86
}
87
```
88
89
## Job Execution
90
91
```scala { .api }
92
class ExecutionEnvironment {
93
// Execute the job and return results
94
def execute(): JobExecutionResult
95
def execute(jobName: String): JobExecutionResult
96
97
// Get execution plan as string
98
def getExecutionPlan(): String
99
}
100
```
101
102
## Data Source Creation
103
104
```scala { .api }
105
class ExecutionEnvironment {
106
// Create DataSet from collections
107
def fromCollection[T: ClassTag : TypeInformation](data: Iterable[T]): DataSet[T]
108
def fromCollection[T: ClassTag : TypeInformation](data: Iterator[T]): DataSet[T]
109
def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T]
110
def fromParallelCollection[T: ClassTag : TypeInformation](data: SplittableIterator[T]): DataSet[T]
111
112
// Generate sequence of numbers
113
def generateSequence(from: Long, to: Long): DataSet[Long]
114
115
// Read from files
116
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
117
def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
118
def readCsvFile[T: ClassTag : TypeInformation](filePath: String): CsvReader[T]
119
def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String): DataSet[T]
120
def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]): DataSet[T]
121
122
// Custom input formats
123
def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
124
125
// Hadoop integration
126
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
127
inputFormat: MapredInputFormat[K, V],
128
keyClass: Class[K],
129
valueClass: Class[V],
130
inputPath: String
131
): DataSet[(K, V)]
132
133
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
134
inputFormat: MapreduceInputFormat[K, V],
135
keyClass: Class[K],
136
valueClass: Class[V],
137
inputPath: String
138
): DataSet[(K, V)]
139
}
140
```
141
142
## Usage Examples
143
144
### Basic Environment Setup
145
146
```scala
147
import org.apache.flink.api.scala._
148
149
// Get execution environment (local or cluster based on context)
150
val env = ExecutionEnvironment.getExecutionEnvironment
151
152
// Set parallelism
153
env.setParallelism(4)
154
155
// Create data and execute
156
val data = env.fromElements(1, 2, 3, 4, 5)
157
data.print()
158
159
env.execute("My Flink Job")
160
```
161
162
### Local Environment for Testing
163
164
```scala
165
import org.apache.flink.api.scala._
166
167
// Create local environment with specific parallelism
168
val localEnv = ExecutionEnvironment.createLocalEnvironment(2)
169
170
val data = localEnv.fromCollection(List("hello", "world", "flink"))
171
val result = data.map(_.toUpperCase)
172
result.print()
173
174
localEnv.execute()
175
```
176
177
### Remote Cluster Execution
178
179
```scala
180
import org.apache.flink.api.scala._
181
182
// Connect to remote Flink cluster
183
val remoteEnv = ExecutionEnvironment.createRemoteEnvironment(
184
"flink-jobmanager",
185
6123,
186
"/path/to/my-job.jar"
187
)
188
189
val data = remoteEnv.readTextFile("hdfs://data/input.txt")
190
val processed = data.flatMap(_.split(" ")).groupBy(identity).sum(1)
191
processed.writeAsText("hdfs://data/output")
192
193
remoteEnv.execute("Word Count Job")
194
```
195
196
## Accumulators
197
198
Accumulators are a simple and efficient means to aggregate values from distributed functions back to the client program.
199
200
### Accumulator Registration
201
202
```scala { .api }
203
class ExecutionEnvironment {
204
// Register accumulators
205
def addDefaultKryoSerializer[T](clazz: Class[T], serializer: Serializer[T]): Unit
206
def addDefaultKryoSerializer[T](clazz: Class[T], serializerClass: Class[_ <: Serializer[T]]): Unit
207
def registerKryoType(clazz: Class[_]): Unit
208
}
209
```
210
211
### Accumulator Types
212
213
```scala { .api }
214
// Basic accumulator types
215
trait Accumulator[V, R] {
216
def add(value: V): Unit
217
def getLocalValue: R
218
def resetLocal(): Unit
219
def merge(other: Accumulator[V, R]): Unit
220
def clone(): Accumulator[V, R]
221
}
222
223
// Built-in accumulator implementations
224
class IntCounter extends Accumulator[Int, Int]
225
class LongCounter extends Accumulator[Long, Long]
226
class DoubleCounter extends Accumulator[Double, Double]
227
class Histogram extends Accumulator[Int, java.util.Map[Int, Int]]
228
229
// List accumulator for collecting values
230
class ListAccumulator[T] extends Accumulator[T, java.util.ArrayList[T]]
231
232
// Maximum/Minimum accumulators
233
class IntMaximum extends Accumulator[Int, Int]
234
class IntMinimum extends Accumulator[Int, Int]
235
class DoubleMaximum extends Accumulator[Double, Double]
236
class DoubleMinimum extends Accumulator[Double, Double]
237
```
238
239
### Usage Examples
240
241
```scala
242
import org.apache.flink.api.scala._
243
import org.apache.flink.api.common.accumulators.{IntCounter, ListAccumulator}
244
import org.apache.flink.api.common.functions.RichMapFunction
245
import org.apache.flink.configuration.Configuration
246
247
val env = ExecutionEnvironment.getExecutionEnvironment
248
249
// Example using accumulator to count processed records
250
class CountingMapFunction extends RichMapFunction[String, String] {
251
var counter: IntCounter = _
252
var errorList: ListAccumulator[String] = _
253
254
override def open(config: Configuration): Unit = {
255
counter = new IntCounter()
256
errorList = new ListAccumulator[String]()
257
258
// Register accumulators with runtime context
259
getRuntimeContext.addAccumulator("processed-count", counter)
260
getRuntimeContext.addAccumulator("errors", errorList)
261
}
262
263
override def map(value: String): String = {
264
counter.add(1)
265
266
if (value.contains("error")) {
267
errorList.add(value)
268
return "ERROR_PROCESSED"
269
}
270
271
value.toUpperCase
272
}
273
}
274
275
val data = env.fromElements("hello", "error1", "world", "error2", "flink")
276
val result = data.map(new CountingMapFunction())
277
278
// Execute and get accumulator results
279
val jobResult = env.execute("Accumulator Example")
280
val processedCount = jobResult.getAccumulatorResult[Int]("processed-count")
281
val errors = jobResult.getAccumulatorResult[java.util.List[String]]("errors")
282
283
println(s"Processed $processedCount records")
284
println(s"Found errors: ${errors.size()}")
285
```