0
# Execution Environment
1
2
The StreamExecutionEnvironment is the main entry point for creating and configuring Flink streaming applications. It provides factory methods for creating environments, configuration options, and execution control.
3
4
## Capabilities
5
6
### Environment Creation
7
8
Factory methods for creating different types of execution environments.
9
10
```scala { .api }
11
object StreamExecutionEnvironment {
12
/**
13
* Creates the execution environment based on the context (local vs cluster)
14
* @return StreamExecutionEnvironment instance
15
*/
16
def getExecutionEnvironment: StreamExecutionEnvironment
17
18
/**
19
* Creates a local execution environment with specified parallelism
20
* @param parallelism The parallelism for local execution
21
* @return Local StreamExecutionEnvironment
22
*/
23
def createLocalEnvironment(parallelism: Int = getDefaultLocalParallelism): StreamExecutionEnvironment
24
25
/**
26
* Creates a local environment with web UI for monitoring
27
* @param config Optional configuration
28
* @return Local environment with web UI
29
*/
30
def createLocalEnvironmentWithWebUI(config: Configuration = null): StreamExecutionEnvironment
31
32
/**
33
* Creates a remote execution environment
34
* @param host Remote JobManager host
35
* @param port Remote JobManager port
36
* @param jarFiles JAR files to submit with the job
37
* @return Remote StreamExecutionEnvironment
38
*/
39
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
40
41
/**
42
* Creates a remote execution environment with parallelism
43
* @param host Remote JobManager host
44
* @param port Remote JobManager port
45
* @param parallelism Parallelism for the job
46
* @param jarFiles JAR files to submit with the job
47
* @return Remote StreamExecutionEnvironment
48
*/
49
def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*): StreamExecutionEnvironment
50
}
51
```
52
53
**Usage Examples:**
54
55
```scala
56
import org.apache.flink.streaming.api.scala._
57
58
// Automatic environment detection (recommended)
59
val env = StreamExecutionEnvironment.getExecutionEnvironment
60
61
// Local environment for testing
62
val localEnv = StreamExecutionEnvironment.createLocalEnvironment(4)
63
64
// Remote environment for cluster submission
65
val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
66
"jobmanager-host", 8081, "my-application.jar"
67
)
68
```
69
70
### Parallelism Configuration
71
72
Control the parallelism settings for stream operations.
73
74
```scala { .api }
75
class StreamExecutionEnvironment {
76
/**
77
* Sets the default parallelism for all operators
78
* @param parallelism The parallelism degree
79
*/
80
def setParallelism(parallelism: Int): Unit
81
82
/**
83
* Sets the maximum parallelism for all operators
84
* @param maxParallelism The maximum parallelism degree
85
*/
86
def setMaxParallelism(maxParallelism: Int): Unit
87
88
/**
89
* Gets the default parallelism
90
* @return Current parallelism setting
91
*/
92
def getParallelism: Int
93
94
/**
95
* Gets the maximum parallelism
96
* @return Current maximum parallelism setting
97
*/
98
def getMaxParallelism: Int
99
}
100
```
101
102
### Runtime Configuration
103
104
Configure runtime behavior and execution modes.
105
106
```scala { .api }
107
class StreamExecutionEnvironment {
108
/**
109
* Sets the runtime execution mode (batch vs streaming)
110
* @param executionMode The runtime execution mode
111
* @return This environment for chaining
112
*/
113
def setRuntimeMode(executionMode: RuntimeExecutionMode): StreamExecutionEnvironment
114
115
/**
116
* Sets the buffer timeout for network buffers
117
* @param timeoutMillis Timeout in milliseconds
118
* @return This environment for chaining
119
*/
120
def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment
121
122
/**
123
* Gets the buffer timeout
124
* @return Current buffer timeout in milliseconds
125
*/
126
def getBufferTimeout: Long
127
128
/**
129
* Disables operator chaining globally
130
* @return This environment for chaining
131
*/
132
def disableOperatorChaining(): StreamExecutionEnvironment
133
}
134
```
135
136
### Checkpointing Configuration
137
138
Enable and configure checkpointing for fault tolerance.
139
140
```scala { .api }
141
class StreamExecutionEnvironment {
142
/**
143
* Enables checkpointing with specified interval
144
* @param interval Checkpoint interval in milliseconds
145
* @return This environment for chaining
146
*/
147
def enableCheckpointing(interval: Long): StreamExecutionEnvironment
148
149
/**
150
* Enables checkpointing with interval and mode
151
* @param interval Checkpoint interval in milliseconds
152
* @param mode Checkpointing mode (exactly-once or at-least-once)
153
* @return This environment for chaining
154
*/
155
def enableCheckpointing(interval: Long, mode: CheckpointingMode): StreamExecutionEnvironment
156
157
/**
158
* Gets the checkpoint configuration
159
* @return CheckpointConfig for fine-tuning
160
*/
161
def getCheckpointConfig: CheckpointConfig
162
163
/**
164
* Sets the state backend for checkpointing
165
* @param backend The state backend implementation
166
* @return This environment for chaining
167
*/
168
def setStateBackend(backend: StateBackend): StreamExecutionEnvironment
169
}
170
```
171
172
**Usage Examples:**
173
174
```scala
175
import org.apache.flink.streaming.api.scala._
176
import org.apache.flink.core.execution.CheckpointingMode
177
178
val env = StreamExecutionEnvironment.getExecutionEnvironment
179
180
// Enable checkpointing every 5 seconds with exactly-once semantics
181
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
182
183
// Configure checkpoint settings
184
val checkpointConfig = env.getCheckpointConfig
185
checkpointConfig.setMinPauseBetweenCheckpoints(500)
186
checkpointConfig.setCheckpointTimeout(60000)
187
```
188
189
### Restart Strategies
190
191
Configure failure recovery behavior.
192
193
```scala { .api }
194
class StreamExecutionEnvironment {
195
/**
196
* Sets the restart strategy configuration
197
* @param restartStrategyConfiguration Restart strategy configuration
198
*/
199
def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
200
201
/**
202
* Gets the current restart strategy
203
* @return Current restart strategy configuration
204
*/
205
def getRestartStrategy: RestartStrategyConfiguration
206
}
207
```
208
209
### Data Source Creation
210
211
Create data sources for streaming applications.
212
213
```scala { .api }
214
class StreamExecutionEnvironment {
215
/**
216
* Creates a DataStream from a sequence of elements
217
* @param data Elements to include in the stream
218
* @return DataStream containing the elements
219
*/
220
def fromElements[T: TypeInformation](data: T*): DataStream[T]
221
222
/**
223
* Creates a DataStream from a collection
224
* @param data Collection to convert to stream
225
* @return DataStream containing collection elements
226
*/
227
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
228
229
/**
230
* Creates a DataStream from an iterator
231
* @param data Iterator to convert to stream
232
* @return DataStream containing iterator elements
233
*/
234
def fromCollection[T: TypeInformation](data: Iterator[T]): DataStream[T]
235
236
/**
237
* Creates a DataStream from a number sequence
238
* @param from Starting number (inclusive)
239
* @param to Ending number (inclusive)
240
* @return DataStream of Long numbers
241
*/
242
def fromSequence(from: Long, to: Long): DataStream[Long]
243
244
/**
245
* Reads a text file as a DataStream
246
* @param filePath Path to the text file
247
* @return DataStream of file lines
248
*/
249
def readTextFile(filePath: String): DataStream[String]
250
251
/**
252
* Creates a socket text stream
253
* @param hostname Host to connect to
254
* @param port Port to connect to
255
* @param delimiter Line delimiter character
256
* @param maxRetry Maximum retry attempts
257
* @return DataStream of socket text lines
258
*/
259
def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0): DataStream[String]
260
261
/**
262
* Adds a custom source function
263
* @param function Source function implementation
264
* @return DataStream from the source
265
*/
266
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]
267
268
/**
269
* Creates a DataStream from a new Source interface
270
* @param source Source implementation
271
* @param watermarkStrategy Watermark strategy for event time
272
* @param sourceName Name for the source operator
273
* @return DataStream from the source
274
*/
275
def fromSource[T: TypeInformation](
276
source: Source[T, _ <: SourceSplit, _],
277
watermarkStrategy: WatermarkStrategy[T],
278
sourceName: String
279
): DataStream[T]
280
}
281
```
282
283
### Job Execution
284
285
Execute streaming jobs and retrieve results.
286
287
```scala { .api }
288
class StreamExecutionEnvironment {
289
/**
290
* Executes the streaming job
291
* @return JobExecutionResult with execution details
292
*/
293
def execute(): JobExecutionResult
294
295
/**
296
* Executes the streaming job with a name
297
* @param jobName Name for the job
298
* @return JobExecutionResult with execution details
299
*/
300
def execute(jobName: String): JobExecutionResult
301
302
/**
303
* Executes the job asynchronously
304
* @return JobClient for monitoring the job
305
*/
306
def executeAsync(): JobClient
307
308
/**
309
* Executes the job asynchronously with a name
310
* @param jobName Name for the job
311
* @return JobClient for monitoring the job
312
*/
313
def executeAsync(jobName: String): JobClient
314
315
/**
316
* Gets the execution plan as a JSON string
317
* @return Execution plan representation
318
*/
319
def getExecutionPlan: String
320
321
/**
322
* Gets the StreamGraph representation
323
* @return StreamGraph for the defined transformations
324
*/
325
def getStreamGraph: StreamGraph
326
}
327
```
328
329
**Usage Examples:**
330
331
```scala
332
import org.apache.flink.streaming.api.scala._
333
334
val env = StreamExecutionEnvironment.getExecutionEnvironment
335
336
// Define your streaming pipeline
337
val stream = env.fromElements(1, 2, 3, 4, 5)
338
.map(_ * 2)
339
.print()
340
341
// Execute synchronously
342
val result = env.execute("My Streaming Job")
343
println(s"Job completed in ${result.getJobExecutionTime} ms")
344
345
// Or execute asynchronously
346
val jobClient = env.executeAsync("My Async Job")
347
// Monitor job status with jobClient
348
```
349
350
## Types
351
352
```scala { .api }
353
// Runtime execution modes
354
sealed trait RuntimeExecutionMode
355
object RuntimeExecutionMode {
356
case object STREAMING extends RuntimeExecutionMode
357
case object BATCH extends RuntimeExecutionMode
358
case object AUTOMATIC extends RuntimeExecutionMode
359
}
360
361
// Checkpointing modes
362
sealed trait CheckpointingMode
363
object CheckpointingMode {
364
case object EXACTLY_ONCE extends CheckpointingMode
365
case object AT_LEAST_ONCE extends CheckpointingMode
366
}
367
368
// Job execution result
369
trait JobExecutionResult {
370
def getJobExecutionTime: Long
371
def getAccumulatorResult[V](accumulatorName: String): V
372
def getAllAccumulatorResults: java.util.Map[String, AnyRef]
373
}
374
375
// Job client for async execution
376
trait JobClient {
377
def getJobID: JobID
378
def getJobStatus: CompletableFuture[JobStatus]
379
def cancel(): CompletableFuture[Void]
380
def stopWithSavepoint(advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String]
381
}
382
```