0
# Stream Execution Environment
1
2
The StreamExecutionEnvironment is the main entry point for creating and configuring Apache Flink streaming applications. It provides methods to create data streams from various sources, configure runtime settings, and execute streaming jobs.
3
4
## Capabilities
5
6
### Environment Creation
7
8
Create different types of execution environments based on your deployment needs.
9
10
```java { .api }
11
/**
12
* Get the default execution environment, which is determined based on the context.
13
* In an IDE or as a regular program: creates a local environment
14
* In a cluster: creates the cluster environment
15
*/
16
static StreamExecutionEnvironment getExecutionEnvironment();
17
18
/**
19
* Create a local execution environment for testing and development
20
* @param parallelism - the parallelism for the local environment
21
*/
22
static StreamExecutionEnvironment createLocalEnvironment(int parallelism);
23
24
/**
25
* Create a local execution environment with default parallelism
26
*/
27
static StreamExecutionEnvironment createLocalEnvironment();
28
29
/**
30
* Create a remote execution environment for cluster deployment
31
* @param host - the host of the JobManager
32
* @param port - the port of the JobManager
33
* @param jarFiles - JAR files to be shipped to the cluster
34
*/
35
static StreamExecutionEnvironment createRemoteEnvironment(
36
String host,
37
int port,
38
String... jarFiles
39
);
40
```
41
42
**Usage Examples:**
43
44
```java
45
// Get default environment (most common)
46
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
47
48
// Create local environment for testing
49
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment(4);
50
51
// Create remote environment for cluster
52
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
53
"jobmanager-host", 6123, "my-job.jar"
54
);
55
```
56
57
### Data Source Creation
58
59
Create data streams from various built-in sources.
60
61
```java { .api }
62
/**
63
* Create a DataStream from the given elements
64
* @param data - the elements to create the stream from
65
*/
66
<T> DataStreamSource<T> fromElements(T... data);
67
68
/**
69
* Create a DataStream from a collection
70
* @param data - the collection to create the stream from
71
*/
72
<T> DataStreamSource<T> fromCollection(Collection<T> data);
73
74
/**
75
* Create a DataStream from a collection with type information
76
* @param data - the collection to create the stream from
77
* @param typeInfo - explicit type information
78
*/
79
<T> DataStreamSource<T> fromCollection(Collection<T> data, TypeInformation<T> typeInfo);
80
81
/**
82
* Add a custom source function to create a data stream
83
* @param function - the source function
84
*/
85
<T> DataStreamSource<T> addSource(SourceFunction<T> function);
86
87
/**
88
* Add a custom source function with type information
89
* @param function - the source function
90
* @param typeInfo - explicit type information
91
*/
92
<T> DataStreamSource<T> addSource(SourceFunction<T> function, TypeInformation<T> typeInfo);
93
94
/**
95
* Read text from a socket connection
96
* @param hostname - the hostname to connect to
97
* @param port - the port to connect to
98
*/
99
DataStreamSource<String> socketTextStream(String hostname, int port);
100
101
/**
102
* Read text from a socket with custom delimiter
103
* @param hostname - the hostname to connect to
104
* @param port - the port to connect to
105
* @param delimiter - the delimiter to split records
106
*/
107
DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter);
108
109
/**
110
* Read the entire text file from the file system
111
* @param filePath - the path to the file
112
*/
113
DataStreamSource<String> readTextFile(String filePath);
114
115
/**
116
* Read text file with character set
117
* @param filePath - the path to the file
118
* @param charsetName - the character set name
119
*/
120
DataStreamSource<String> readTextFile(String filePath, String charsetName);
121
122
/**
123
* Generate a sequence of numbers (deprecated)
124
* @deprecated Use fromSequence() instead
125
* @param from - start of sequence (inclusive)
126
* @param to - end of sequence (inclusive)
127
*/
128
@Deprecated
129
DataStreamSource<Long> generateSequence(long from, long to);
130
131
/**
132
* Create a sequence of numbers from 'from' to 'to'
133
* @param from - start of sequence (inclusive)
134
* @param to - end of sequence (inclusive)
135
* @return DataStream of Long values
136
*/
137
DataStreamSource<Long> fromSequence(long from, long to);
138
139
/**
140
* Create a DataStream from a modern unified Source
141
* @param source - the Source to read from
142
* @param timestampsAndWatermarks - watermark strategy for event time
143
* @param sourceName - name of the source
144
* @return DataStream from the source
145
*/
146
<T> DataStreamSource<T> fromSource(
147
Source<T, ?, ?> source,
148
WatermarkStrategy<T> timestampsAndWatermarks,
149
String sourceName
150
);
151
```
152
153
**Usage Examples:**
154
155
```java
156
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
157
158
// From elements
159
DataStream<String> elements = env.fromElements("hello", "world", "flink");
160
161
// From collection
162
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
163
DataStream<Integer> fromList = env.fromCollection(numbers);
164
165
// Socket stream
166
DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
167
168
// File stream
169
DataStream<String> fileStream = env.readTextFile("/path/to/input.txt");
170
171
// Sequence
172
DataStream<Long> sequence = env.generateSequence(1, 1000);
173
174
// Custom source
175
DataStream<MyEvent> customStream = env.addSource(new MyCustomSource());
176
```
177
178
### Job Execution
179
180
Execute streaming jobs and handle execution results.
181
182
```java { .api }
183
/**
184
* Execute the streaming job
185
* @return JobExecutionResult containing execution information
186
*/
187
JobExecutionResult execute() throws Exception;
188
189
/**
190
* Execute the streaming job with a custom name
191
* @param jobName - the name of the job
192
* @return JobExecutionResult containing execution information
193
*/
194
JobExecutionResult execute(String jobName) throws Exception;
195
196
/**
197
* Execute the streaming job asynchronously
198
* @return JobClient for managing the running job
199
*/
200
JobClient executeAsync() throws Exception;
201
202
/**
203
* Execute the streaming job asynchronously with a custom name
204
* @param jobName - the name of the job
205
* @return JobClient for managing the running job
206
*/
207
JobClient executeAsync(String jobName) throws Exception;
208
```
209
210
**Usage Examples:**
211
212
```java
213
// Execute with default name
214
JobExecutionResult result = env.execute();
215
216
// Execute with custom name
217
JobExecutionResult result = env.execute("My Streaming Job");
218
219
// Access execution results
220
System.out.println("Job ID: " + result.getJobID());
221
System.out.println("Execution Time: " + result.getNetRuntime());
222
```
223
224
### Configuration
225
226
Configure runtime settings and execution parameters.
227
228
```java { .api }
229
/**
230
* Set the parallelism for operations executed through this environment
231
* @param parallelism - the parallelism
232
*/
233
StreamExecutionEnvironment setParallelism(int parallelism);
234
235
/**
236
* Get the default parallelism
237
*/
238
int getParallelism();
239
240
/**
241
* Set the maximum parallelism
242
* @param maxParallelism - the maximum parallelism
243
*/
244
StreamExecutionEnvironment setMaxParallelism(int maxParallelism);
245
246
/**
247
* Get the maximum parallelism
248
*/
249
int getMaxParallelism();
250
251
/**
252
* Enable checkpointing for fault tolerance
253
* @param interval - checkpoint interval in milliseconds
254
*/
255
StreamExecutionEnvironment enableCheckpointing(long interval);
256
257
/**
258
* Enable checkpointing with mode
259
* @param interval - checkpoint interval in milliseconds
260
* @param mode - checkpointing mode (EXACTLY_ONCE or AT_LEAST_ONCE)
261
*/
262
StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode);
263
264
/**
265
* Get the checkpoint configuration
266
*/
267
CheckpointConfig getCheckpointConfig();
268
269
/**
270
* Set the time characteristic for the application (deprecated)
271
* @deprecated Time characteristics are deprecated. Use source-based watermark assignment instead.
272
* @param characteristic - the time characteristic (EventTime, ProcessingTime, IngestionTime)
273
*/
274
@Deprecated
275
StreamExecutionEnvironment setStreamTimeCharacteristic(TimeCharacteristic characteristic);
276
277
/**
278
* Set the runtime execution mode (batch or streaming)
279
* @param executionMode - BATCH for bounded data, STREAMING for unbounded data
280
* @return StreamExecutionEnvironment for method chaining
281
*/
282
StreamExecutionEnvironment setRuntimeMode(RuntimeExecutionMode executionMode);
283
284
/**
285
* Get the time characteristic
286
*/
287
TimeCharacteristic getStreamTimeCharacteristic();
288
289
/**
290
* Set the buffer timeout for network buffers
291
* @param timeoutMillis - timeout in milliseconds
292
*/
293
StreamExecutionEnvironment setBufferTimeout(long timeoutMillis);
294
295
/**
296
* Get the buffer timeout
297
*/
298
long getBufferTimeout();
299
```
300
301
**Usage Examples:**
302
303
```java
304
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
305
306
// Set parallelism
307
env.setParallelism(4);
308
env.setMaxParallelism(128);
309
310
// Enable checkpointing
311
env.enableCheckpointing(5000); // every 5 seconds
312
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
313
314
// Configure checkpointing
315
CheckpointConfig config = env.getCheckpointConfig();
316
config.setMinPauseBetweenCheckpoints(500);
317
config.setCheckpointTimeout(60000);
318
319
// Set time characteristic
320
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
321
322
// Set buffer timeout
323
env.setBufferTimeout(100);
324
```
325
326
### Execution Configuration
327
328
Access and modify execution configuration settings.
329
330
```java { .api }
331
/**
332
* Get the execution configuration
333
*/
334
ExecutionConfig getConfig();
335
```
336
337
**Usage Examples:**
338
339
```java
340
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
341
ExecutionConfig config = env.getConfig();
342
343
// Configure execution settings
344
config.setAutoWatermarkInterval(1000);
345
config.setLatencyTrackingInterval(2000);
346
config.enableObjectReuse();
347
config.setGlobalJobParameters(ParameterTool.fromArgs(args));
348
```
349
350
## Types
351
352
### Environment Types
353
354
```java { .api }
355
abstract class StreamExecutionEnvironment {
356
// Factory methods, source creation, execution, and configuration methods as above
357
}
358
359
class LocalStreamEnvironment extends StreamExecutionEnvironment {
360
// Local execution environment implementation
361
}
362
363
class RemoteStreamEnvironment extends StreamExecutionEnvironment {
364
// Remote cluster execution environment implementation
365
}
366
367
// Data source type
368
class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
369
// Represents a data source in the streaming topology
370
}
371
```
372
373
### Configuration Types
374
375
```java { .api }
376
enum TimeCharacteristic {
377
ProcessingTime, // Processing time semantics
378
IngestionTime, // Ingestion time semantics
379
EventTime // Event time semantics
380
}
381
382
enum CheckpointingMode {
383
EXACTLY_ONCE, // Exactly-once processing guarantees
384
AT_LEAST_ONCE // At-least-once processing guarantees
385
}
386
387
class ExecutionConfig {
388
// Configuration for job execution
389
void setAutoWatermarkInterval(long interval);
390
void setLatencyTrackingInterval(long interval);
391
void enableObjectReuse();
392
void setGlobalJobParameters(GlobalJobParameters parameters);
393
}
394
395
class CheckpointConfig {
396
// Configuration for checkpointing
397
void setCheckpointingMode(CheckpointingMode mode);
398
void setMinPauseBetweenCheckpoints(long minPause);
399
void setCheckpointTimeout(long timeout);
400
void setMaxConcurrentCheckpoints(int maxConcurrent);
401
}
402
```
403
404
### Job Execution Results
405
406
```java { .api }
407
class JobExecutionResult {
408
// Result of job execution
409
JobID getJobID();
410
long getNetRuntime();
411
long getNetRuntime(TimeUnit desiredUnit);
412
Map<String, Object> getAllAccumulatorResults();
413
<T> T getAccumulatorResult(String accumulatorName);
414
}
415
```