0
# Execution Environment
1
2
The execution environment provides the entry point for creating and configuring Flink streaming applications. It manages job execution, parallelism settings, and environment-specific configurations.
3
4
## StreamExecutionEnvironment
5
6
The main entry point for all streaming applications.
7
8
```java { .api }
9
public abstract class StreamExecutionEnvironment {
10
// Environment creation
11
public static StreamExecutionEnvironment getExecutionEnvironment();
12
public static LocalStreamEnvironment createLocalEnvironment();
13
public static LocalStreamEnvironment createLocalEnvironment(int parallelism);
14
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, String... jarFiles);
15
public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port, int parallelism, String... jarFiles);
16
public static void setDefaultLocalParallelism(int parallelism);
17
18
// Job execution
19
public JobExecutionResult execute() throws Exception;
20
public JobExecutionResult execute(String jobName) throws Exception;
21
22
// Configuration
23
public StreamExecutionEnvironment setParallelism(int parallelism);
24
public int getParallelism();
25
public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis);
26
public long getBufferTimeout();
27
public StreamExecutionEnvironment disableOperatorChaining();
28
public void setNumberOfExecutionRetries(int numberOfExecutionRetries);
29
public int getNumberOfExecutionRetries();
30
public void setStateHandleProvider(StateHandleProvider<?> provider);
31
public StateHandleProvider<?> getStateHandleProvider();
32
33
// Checkpointing
34
public StreamExecutionEnvironment enableCheckpointing(long interval);
35
public StreamExecutionEnvironment enableCheckpointing();
36
37
// Source creation - Element and Collection sources
38
public DataStreamSource<Long> generateSequence(long from, long to);
39
public DataStreamSource<Long> generateParallelSequence(long from, long to);
40
public <OUT> DataStreamSource<OUT> fromElements(OUT... data);
41
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data);
42
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data, TypeInformation<OUT> typeInfo);
43
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, Class<OUT> type);
44
public <OUT> DataStreamSource<OUT> fromCollection(Iterator<OUT> data, TypeInformation<OUT> typeInfo);
45
public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, Class<OUT> type);
46
public <OUT> DataStreamSource<OUT> fromParallelCollection(SplittableIterator<OUT> iterator, TypeInformation<OUT> typeInfo);
47
48
// Source creation - Generic sources
49
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function);
50
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName);
51
public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat);
52
public <OUT> DataStreamSource<OUT> createInput(InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> typeInfo);
53
54
// Built-in sources - File sources
55
public DataStreamSource<String> readTextFile(String filePath);
56
public DataStreamSource<String> readTextFile(String filePath, String charsetName);
57
public DataStreamSource<StringValue> readTextFileWithValue(String filePath);
58
public DataStreamSource<StringValue> readTextFileWithValue(String filePath, String charsetName, boolean skipInvalidLines);
59
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath);
60
public <OUT> DataStreamSource<OUT> readFile(FileInputFormat<OUT> inputFormat, String filePath, FileProcessingMode watchType, long interval);
61
public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String filePath, Class<OUT> typeClass);
62
public <OUT> DataStreamSource<OUT> readFileOfPrimitives(String filePath, String delimiter, Class<OUT> typeClass);
63
public DataStream<String> readFileStream(String filePath, long intervalMillis, WatchType watchType);
64
65
// Built-in sources - Network sources
66
public DataStreamSource<String> socketTextStream(String hostname, int port);
67
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter);
68
public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter, long maxRetry);
69
70
// Type system and configuration
71
public ExecutionConfig getConfig();
72
public void addDefaultKryoSerializer(Class<?> type, Serializer<?> serializer);
73
public void addDefaultKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);
74
public void registerType(Class<?> type);
75
public void registerTypeWithKryoSerializer(Class<?> type, Serializer<?> serializer);
76
public void registerTypeWithKryoSerializer(Class<?> type, Class<? extends Serializer<?>> serializerClass);
77
78
// Utility methods
79
public <F> F clean(F f);
80
public StreamGraph getStreamGraph();
81
public String getExecutionPlan();
82
}
83
```
84
85
## LocalStreamEnvironment
86
87
Environment for local execution, primarily used for development and testing.
88
89
```java { .api }
90
public class LocalStreamEnvironment extends StreamExecutionEnvironment {
91
public LocalStreamEnvironment();
92
public LocalStreamEnvironment(Configuration configuration);
93
94
@Override
95
public JobExecutionResult execute() throws Exception;
96
97
@Override
98
public JobExecutionResult execute(String jobName) throws Exception;
99
}
100
```
101
102
## RemoteStreamEnvironment
103
104
Environment for executing streaming jobs on a remote Flink cluster.
105
106
```java { .api }
107
public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
108
public RemoteStreamEnvironment(String host, int port, String... jarFiles);
109
public RemoteStreamEnvironment(String host, int port, int parallelism, String... jarFiles);
110
111
@Override
112
public JobExecutionResult execute() throws Exception;
113
114
@Override
115
public JobExecutionResult execute(String jobName) throws Exception;
116
}
117
```
118
119
## Usage Examples
120
121
### Basic Environment Setup
122
123
```java
124
// Get default environment (local or remote based on context)
125
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
126
127
// Set parallelism
128
env.setParallelism(4);
129
130
// Set network buffer timeout
131
env.setBufferTimeout(100);
132
133
// Disable operator chaining for debugging
134
env.disableOperatorChaining();
135
```
136
137
### Local Environment
138
139
```java
140
// Create local environment with default parallelism
141
LocalStreamEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
142
143
// Create local environment with specific parallelism
144
LocalStreamEnvironment localEnv2 = StreamExecutionEnvironment.createLocalEnvironment(2);
145
```
146
147
### Remote Environment
148
149
```java
150
// Connect to remote cluster
151
RemoteStreamEnvironment remoteEnv = StreamExecutionEnvironment
152
.createRemoteEnvironment("localhost", 6123, "/path/to/job.jar");
153
154
// With specific parallelism
155
RemoteStreamEnvironment remoteEnv2 = StreamExecutionEnvironment
156
.createRemoteEnvironment("localhost", 6123, 4, "/path/to/job.jar");
157
```
158
159
### Creating Sources
160
161
```java
162
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
163
164
// From elements
165
DataStreamSource<String> stream1 = env.fromElements("hello", "world");
166
167
// From collection
168
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
169
DataStreamSource<Integer> stream2 = env.fromCollection(numbers);
170
171
// From file
172
DataStreamSource<String> stream3 = env.readTextFile("/path/to/input.txt");
173
174
// From socket
175
DataStreamSource<String> stream4 = env.socketTextStream("localhost", 9999);
176
177
// Custom source
178
DataStreamSource<String> stream5 = env.addSource(new MyCustomSource());
179
```
180
181
### Job Execution
182
183
```java
184
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
185
186
// Create your data stream pipeline
187
DataStream<String> processed = env
188
.socketTextStream("localhost", 9999)
189
.map(String::toUpperCase)
190
.filter(s -> s.length() > 5);
191
192
processed.print();
193
194
// Execute the job
195
JobExecutionResult result = env.execute("My Streaming Job");
196
197
// Access execution results
198
System.out.println("Job execution time: " + result.getNetRuntime());
199
```
200
201
## Types
202
203
```java { .api }
204
public class JobExecutionResult {
205
public long getNetRuntime();
206
public long getNetRuntime(TimeUnit desiredUnit);
207
public Map<String, Object> getAllAccumulatorResults();
208
public <T> T getAccumulatorResult(String name);
209
}
210
211
public interface StateHandleProvider<T extends StateHandle> extends Serializable {
212
T createStateHandle(Serializable state) throws Exception;
213
}
214
215
public enum FileProcessingMode {
216
PROCESS_ONCE,
217
PROCESS_CONTINUOUSLY
218
}
219
220
public enum WatchType {
221
PROCESS_ONCE,
222
REPROCESS_WITH_APPENDED
223
}
224
225
public class StreamGraph {
226
// Internal representation of the streaming dataflow graph
227
}
228
229
public class StringValue implements Value {
230
// Flink's StringValue type for efficient string handling
231
public StringValue();
232
public StringValue(String value);
233
public String getValue();
234
public void setValue(String value);
235
}
236
```