0
# Execution Environments
1
2
Execution environments provide the central context for Flink batch program execution, offering methods to create data sources, configure execution parameters, and trigger program execution.
3
4
## Capabilities
5
6
### ExecutionEnvironment
7
8
The abstract base class for all execution environments, providing the primary entry point for Flink batch programs.
9
10
```java { .api }
11
/**
12
* Get the default execution environment (auto-detects local vs remote)
13
* @return ExecutionEnvironment instance
14
*/
15
public static ExecutionEnvironment getExecutionEnvironment();
16
17
/**
18
* Create a local execution environment
19
* @return LocalEnvironment for local execution
20
*/
21
public static LocalEnvironment createLocalEnvironment();
22
23
/**
24
* Create a local execution environment with specific parallelism
25
* @param parallelism the parallelism for the local environment
26
* @return LocalEnvironment for local execution
27
*/
28
public static LocalEnvironment createLocalEnvironment(int parallelism);
29
30
/**
31
* Create a remote execution environment
32
* @param host hostname of the JobManager
33
* @param port port of the JobManager
34
* @param jarFiles JAR files to be sent to the cluster
35
* @return RemoteEnvironment for remote execution
36
*/
37
public static RemoteEnvironment createRemoteEnvironment(String host, int port, String... jarFiles);
38
```
39
40
**Usage Examples:**
41
42
```java
43
// Auto-detect environment (local when running in IDE, remote when submitted to cluster)
44
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
45
46
// Force local execution with default parallelism
47
LocalEnvironment localEnv = ExecutionEnvironment.createLocalEnvironment();
48
49
// Local execution with specific parallelism
50
LocalEnvironment localEnvParallel = ExecutionEnvironment.createLocalEnvironment(4);
51
52
// Remote execution
53
RemoteEnvironment remoteEnv = ExecutionEnvironment.createRemoteEnvironment(
54
"localhost", 8081, "/path/to/your-job.jar");
55
```
56
57
### Data Source Creation
58
59
Methods for creating DataSets from various data sources.
60
61
```java { .api }
62
/**
63
* Create a DataSet from a Java collection
64
* @param data the collection to create the DataSet from
65
* @return DataSet containing the collection elements
66
*/
67
public <T> DataSet<T> fromCollection(Collection<T> data);
68
69
/**
70
* Create a DataSet from individual elements
71
* @param data the elements to create the DataSet from
72
* @return DataSet containing the specified elements
73
*/
74
@SafeVarargs
75
public final <T> DataSet<T> fromElements(T... data);
76
77
/**
78
* Read a text file and create a DataSet of Strings
79
* @param filePath path to the text file
80
* @return DataSet of lines from the file
81
*/
82
public DataSet<String> readTextFile(String filePath);
83
84
/**
85
* Read a text file with specific character encoding
86
* @param filePath path to the text file
87
* @param charsetName the charset name for decoding the file
88
* @return DataSet of lines from the file
89
*/
90
public DataSet<String> readTextFile(String filePath, String charsetName);
91
92
/**
93
* Read text file as StringValue objects
94
* @param filePath path to the text file
95
* @return DataSource where each element is a StringValue from the file
96
*/
97
public DataSource<StringValue> readTextFileWithValue(String filePath);
98
99
/**
100
* Read text file as StringValue objects with charset and error handling
101
* @param filePath path to the text file
102
* @param charsetName the charset name for decoding the file
103
* @param skipInvalidLines whether to skip lines that cannot be decoded
104
* @return DataSource where each element is a StringValue from the file
105
*/
106
public DataSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean skipInvalidLines);
107
108
/**
109
* Read file containing primitive values
110
* @param filePath path to the file
111
* @param typeClass the class of the primitive type
112
* @return DataSource with elements of the primitive type
113
*/
114
public <X> DataSource<X> readFileOfPrimitives(String filePath, Class<X> typeClass);
115
116
/**
117
* Read file containing primitive values with custom delimiter
118
* @param filePath path to the file
119
* @param delimiter the delimiter separating values
120
* @param typeClass the class of the primitive type
121
* @return DataSource with elements of the primitive type
122
*/
123
public <X> DataSource<X> readFileOfPrimitives(String filePath, String delimiter, Class<X> typeClass);
124
125
/**
126
* Read a file with a custom input format
127
* @param inputFormat the input format to use
128
* @param filePath path to the file
129
* @return DataSet with elements read by the input format
130
*/
131
public <T> DataSet<T> readFile(FileInputFormat<T> inputFormat, String filePath);
132
133
/**
134
* Create a CSV reader for structured data
135
* @param filePath path to the CSV file
136
* @return CsvReader for configuration and DataSet creation
137
*/
138
public CsvReader readCsvFile(String filePath);
139
140
/**
141
* Generate a sequence of numbers
142
* @param from starting number (inclusive)
143
* @param to ending number (inclusive)
144
* @return DataSet containing the number sequence
145
*/
146
public DataSet<Long> generateSequence(long from, long to);
147
```
148
149
### Execution Control
150
151
Methods for configuring and executing Flink programs.
152
153
```java { .api }
154
/**
155
* Execute the program with a generated job name
156
* @return JobExecutionResult containing execution statistics
157
* @throws Exception if execution fails
158
*/
159
public JobExecutionResult execute() throws Exception;
160
161
/**
162
* Execute the program with a specific job name
163
* @param jobName name for the job
164
* @return JobExecutionResult containing execution statistics
165
* @throws Exception if execution fails
166
*/
167
public JobExecutionResult execute(String jobName) throws Exception;
168
169
/**
170
* Get the execution plan as JSON without executing
171
* @return JSON representation of the execution plan
172
* @throws Exception if plan generation fails
173
*/
174
public String getExecutionPlan() throws Exception;
175
176
/**
177
* Set the default parallelism for all operations
178
* @param parallelism the parallelism level
179
*/
180
public void setParallelism(int parallelism);
181
182
/**
183
* Get the current default parallelism
184
* @return the current parallelism level
185
*/
186
public int getParallelism();
187
188
/**
189
* Get the execution configuration
190
* @return ExecutionConfig for advanced configuration
191
*/
192
public ExecutionConfig getConfig();
193
```
194
195
### LocalEnvironment
196
197
Specialized execution environment for local execution in the current JVM.
198
199
```java { .api }
200
/**
201
* LocalEnvironment for local execution
202
* Inherits all methods from ExecutionEnvironment
203
* Executes programs in the current JVM process
204
*/
205
public class LocalEnvironment extends ExecutionEnvironment {
206
// Additional local-specific configuration methods available
207
}
208
```
209
210
### RemoteEnvironment
211
212
Specialized execution environment for remote execution on a Flink cluster.
213
214
```java { .api }
215
/**
216
* RemoteEnvironment for remote cluster execution
217
* Inherits all methods from ExecutionEnvironment
218
* Submits programs to a remote Flink cluster
219
*/
220
public class RemoteEnvironment extends ExecutionEnvironment {
221
// Additional remote-specific configuration methods available
222
}
223
```
224
225
### CollectionEnvironment
226
227
Specialized execution environment for collection-based execution (primarily for testing).
228
229
```java { .api }
230
/**
231
* CollectionEnvironment for collection-based execution
232
* Inherits all methods from ExecutionEnvironment
233
* Executes programs using Java collections (useful for testing)
234
*/
235
public class CollectionEnvironment extends ExecutionEnvironment {
236
// Collection-based execution methods
237
}
238
```
239
240
### ExecutionEnvironmentFactory
241
242
Factory interface for creating custom execution environments.
243
244
```java { .api }
245
/**
246
* Factory interface for creating custom ExecutionEnvironments
247
*/
248
public interface ExecutionEnvironmentFactory {
249
/**
250
* Create a custom execution environment
251
* @return ExecutionEnvironment instance
252
*/
253
ExecutionEnvironment createExecutionEnvironment();
254
}
255
```
256
257
## Types
258
259
```java { .api }
260
import org.apache.flink.api.java.ExecutionEnvironment;
261
import org.apache.flink.api.java.LocalEnvironment;
262
import org.apache.flink.api.java.RemoteEnvironment;
263
import org.apache.flink.api.java.CollectionEnvironment;
264
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
265
import org.apache.flink.api.java.operators.DataSource;
266
import org.apache.flink.api.common.JobExecutionResult;
267
import org.apache.flink.configuration.Configuration;
268
import org.apache.flink.core.execution.JobClient;
269
import org.apache.flink.types.StringValue;
270
```