0
# Stream Execution Environment
1
2
The StreamExecutionEnvironment is the main entry point for creating Flink streaming applications. It provides methods for creating data sources, configuring execution parameters, and managing the job lifecycle.
3
4
## Capabilities
5
6
### Environment Creation
7
8
Factory methods for creating different types of execution environments.
9
10
```scala { .api }
11
/**
12
* Creates a local execution environment with default parallelism
13
*/
14
object StreamExecutionEnvironment {
15
def getExecutionEnvironment: StreamExecutionEnvironment
16
def createLocalEnvironment(): StreamExecutionEnvironment
17
def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment
18
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
19
}
20
```
21
22
**Usage Examples:**
23
24
```scala
25
import org.apache.flink.streaming.api.scala._
26
27
// Get execution environment (local or cluster depending on context)
28
val env = StreamExecutionEnvironment.getExecutionEnvironment
29
30
// Force local execution
31
val localEnv = StreamExecutionEnvironment.createLocalEnvironment()
32
33
// Remote cluster execution
34
val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "path/to/job.jar")
35
```
36
37
### Configuration Management
38
39
Methods for configuring job execution parameters.
40
41
```scala { .api }
42
/**
43
* Configure parallelism and execution mode
44
*/
45
class StreamExecutionEnvironment {
46
def setParallelism(parallelism: Int): Unit
47
def getParallelism: Int
48
def setMaxParallelism(maxParallelism: Int): Unit
49
def getMaxParallelism: Int
50
def setRuntimeMode(executionMode: RuntimeExecutionMode): StreamExecutionEnvironment
51
def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment
52
def getBufferTimeout: Long
53
def disableOperatorChaining(): StreamExecutionEnvironment
54
}
55
```
56
57
### Checkpointing Configuration
58
59
Enable and configure checkpointing for fault tolerance.
60
61
```scala { .api }
62
/**
63
* Configure checkpointing for fault tolerance
64
*/
65
class StreamExecutionEnvironment {
66
def enableCheckpointing(interval: Long): StreamExecutionEnvironment
67
def enableCheckpointing(interval: Long, mode: CheckpointingMode): StreamExecutionEnvironment
68
def getCheckpointConfig: CheckpointConfig
69
}
70
```
71
72
**Usage Examples:**
73
74
```scala
75
// Enable checkpointing every 5 seconds
76
env.enableCheckpointing(5000)
77
78
// Configure checkpoint mode
79
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
80
81
// Advanced checkpoint configuration
82
val checkpointConfig = env.getCheckpointConfig
83
checkpointConfig.setMinPauseBetweenCheckpoints(500)
84
checkpointConfig.setCheckpointTimeout(60000)
85
```
86
87
### Data Source Creation
88
89
Methods for creating data streams from various sources.
90
91
```scala { .api }
92
/**
93
* Create data streams from various sources
94
*/
95
class StreamExecutionEnvironment {
96
def fromCollection[T](data: Seq[T]): DataStream[T]
97
def fromElements[T](data: T*): DataStream[T]
98
def fromParallelCollection[T](data: SplittableIterator[T]): DataStream[T]
99
def generateSequence(from: Long, to: Long): DataStream[Long]
100
def readTextFile(filePath: String): DataStream[String]
101
def readFile[T](inputFormat: FileInputFormat[T], filePath: String): DataStream[T]
102
def socketTextStream(hostname: String, port: Int): DataStream[String]
103
def addSource[T](function: SourceFunction[T]): DataStream[T]
104
def fromSource[T](source: Source[T, _, _], watermarkStrategy: WatermarkStrategy[T], sourceName: String): DataStream[T]
105
}
106
```
107
108
**Usage Examples:**
109
110
```scala
111
// Create from collection
112
val dataStream = env.fromCollection(List(1, 2, 3, 4, 5))
113
114
// Create from elements
115
val elementsStream = env.fromElements("hello", "world", "flink")
116
117
// Read from file
118
val fileStream = env.readTextFile("path/to/input.txt")
119
120
// Socket stream for testing
121
val socketStream = env.socketTextStream("localhost", 9999)
122
123
// Custom source function
124
val customStream = env.addSource(new MyCustomSourceFunction())
125
```
126
127
### Job Execution
128
129
Methods for executing the streaming job.
130
131
```scala { .api }
132
/**
133
* Execute the streaming job
134
*/
135
class StreamExecutionEnvironment {
136
def execute(): JobExecutionResult
137
def execute(jobName: String): JobExecutionResult
138
def executeAsync(): JobClient
139
def executeAsync(jobName: String): JobClient
140
def getStreamGraph: StreamGraph
141
def getStreamGraph(jobName: String): StreamGraph
142
}
143
```
144
145
**Usage Examples:**
146
147
```scala
148
// Execute with default job name
149
env.execute()
150
151
// Execute with custom job name
152
env.execute("My Streaming Job")
153
154
// Async execution for non-blocking operation
155
val jobClient = env.executeAsync("Async Job")
156
```
157
158
### State Backend Configuration
159
160
Configure state backends for stateful operations.
161
162
```scala { .api }
163
/**
164
* Configure state backend
165
*/
166
class StreamExecutionEnvironment {
167
def setStateBackend(backend: StateBackend): StreamExecutionEnvironment
168
def getStateBackend: StateBackend
169
}
170
```
171
172
### Configuration Access
173
174
Access to execution configuration and cached files.
175
176
```scala { .api }
177
/**
178
* Access configuration and resources
179
*/
180
class StreamExecutionEnvironment {
181
def getConfig: ExecutionConfig
182
def getCachedFiles: Map[String, URI]
183
def registerCachedFile(filePath: String, name: String): Unit
184
def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit
185
}
186
```
187
188
## Types
189
190
```scala { .api }
191
// Main environment class
192
class StreamExecutionEnvironment(javaEnv: JavaEnv)
193
194
// Execution modes and configuration
195
enum RuntimeExecutionMode {
196
STREAMING, BATCH
197
}
198
199
enum CheckpointingMode {
200
EXACTLY_ONCE, AT_LEAST_ONCE
201
}
202
203
// Job execution results
204
trait JobExecutionResult
205
trait JobClient
206
207
// Configuration classes
208
class ExecutionConfig
209
class CheckpointConfig
210
class StreamGraph
211
```