or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-io.mddata-streams.mdexecution-environment.mdindex.mdkeyed-streams.mdprocessing-functions.mdsinks-output.mdstream-connections.mdwindow-functions.mdwindowing.md

execution-environment.mddocs/

0

# Execution Environment

1

2

The StreamExecutionEnvironment is the main entry point for creating and configuring Flink streaming applications. It provides factory methods for creating environments, configuration options, and execution control.

3

4

## Capabilities

5

6

### Environment Creation

7

8

Factory methods for creating different types of execution environments.

9

10

```scala { .api }

11

object StreamExecutionEnvironment {

12

/**

13

* Creates the execution environment based on the context (local vs cluster)

14

* @return StreamExecutionEnvironment instance

15

*/

16

def getExecutionEnvironment: StreamExecutionEnvironment

17

18

/**

19

* Creates a local execution environment with specified parallelism

20

* @param parallelism The parallelism for local execution

21

* @return Local StreamExecutionEnvironment

22

*/

23

def createLocalEnvironment(parallelism: Int = getDefaultLocalParallelism): StreamExecutionEnvironment

24

25

/**

26

* Creates a local environment with web UI for monitoring

27

* @param config Optional configuration

28

* @return Local environment with web UI

29

*/

30

def createLocalEnvironmentWithWebUI(config: Configuration = null): StreamExecutionEnvironment

31

32

/**

33

* Creates a remote execution environment

34

* @param host Remote JobManager host

35

* @param port Remote JobManager port

36

* @param jarFiles JAR files to submit with the job

37

* @return Remote StreamExecutionEnvironment

38

*/

39

def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment

40

41

/**

42

* Creates a remote execution environment with parallelism

43

* @param host Remote JobManager host

44

* @param port Remote JobManager port

45

* @param parallelism Parallelism for the job

46

* @param jarFiles JAR files to submit with the job

47

* @return Remote StreamExecutionEnvironment

48

*/

49

def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*): StreamExecutionEnvironment

50

}

51

```

52

53

**Usage Examples:**

54

55

```scala

56

import org.apache.flink.streaming.api.scala._

57

58

// Automatic environment detection (recommended)

59

val env = StreamExecutionEnvironment.getExecutionEnvironment

60

61

// Local environment for testing

62

val localEnv = StreamExecutionEnvironment.createLocalEnvironment(4)

63

64

// Remote environment for cluster submission

65

val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(

66

"jobmanager-host", 8081, "my-application.jar"

67

)

68

```

69

70

### Parallelism Configuration

71

72

Control the parallelism settings for stream operations.

73

74

```scala { .api }

75

class StreamExecutionEnvironment {

76

/**

77

* Sets the default parallelism for all operators

78

* @param parallelism The parallelism degree

79

*/

80

def setParallelism(parallelism: Int): Unit

81

82

/**

83

* Sets the maximum parallelism for all operators

84

* @param maxParallelism The maximum parallelism degree

85

*/

86

def setMaxParallelism(maxParallelism: Int): Unit

87

88

/**

89

* Gets the default parallelism

90

* @return Current parallelism setting

91

*/

92

def getParallelism: Int

93

94

/**

95

* Gets the maximum parallelism

96

* @return Current maximum parallelism setting

97

*/

98

def getMaxParallelism: Int

99

}

100

```

101

102

### Runtime Configuration

103

104

Configure runtime behavior and execution modes.

105

106

```scala { .api }

107

class StreamExecutionEnvironment {

108

/**

109

* Sets the runtime execution mode (batch vs streaming)

110

* @param executionMode The runtime execution mode

111

* @return This environment for chaining

112

*/

113

def setRuntimeMode(executionMode: RuntimeExecutionMode): StreamExecutionEnvironment

114

115

/**

116

* Sets the buffer timeout for network buffers

117

* @param timeoutMillis Timeout in milliseconds

118

* @return This environment for chaining

119

*/

120

def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment

121

122

/**

123

* Gets the buffer timeout

124

* @return Current buffer timeout in milliseconds

125

*/

126

def getBufferTimeout: Long

127

128

/**

129

* Disables operator chaining globally

130

* @return This environment for chaining

131

*/

132

def disableOperatorChaining(): StreamExecutionEnvironment

133

}

134

```

135

136

### Checkpointing Configuration

137

138

Enable and configure checkpointing for fault tolerance.

139

140

```scala { .api }

141

class StreamExecutionEnvironment {

142

/**

143

* Enables checkpointing with specified interval

144

* @param interval Checkpoint interval in milliseconds

145

* @return This environment for chaining

146

*/

147

def enableCheckpointing(interval: Long): StreamExecutionEnvironment

148

149

/**

150

* Enables checkpointing with interval and mode

151

* @param interval Checkpoint interval in milliseconds

152

* @param mode Checkpointing mode (exactly-once or at-least-once)

153

* @return This environment for chaining

154

*/

155

def enableCheckpointing(interval: Long, mode: CheckpointingMode): StreamExecutionEnvironment

156

157

/**

158

* Gets the checkpoint configuration

159

* @return CheckpointConfig for fine-tuning

160

*/

161

def getCheckpointConfig: CheckpointConfig

162

163

/**

164

* Sets the state backend for checkpointing

165

* @param backend The state backend implementation

166

* @return This environment for chaining

167

*/

168

def setStateBackend(backend: StateBackend): StreamExecutionEnvironment

169

}

170

```

171

172

**Usage Examples:**

173

174

```scala

175

import org.apache.flink.streaming.api.scala._

176

import org.apache.flink.core.execution.CheckpointingMode

177

178

val env = StreamExecutionEnvironment.getExecutionEnvironment

179

180

// Enable checkpointing every 5 seconds with exactly-once semantics

181

env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)

182

183

// Configure checkpoint settings

184

val checkpointConfig = env.getCheckpointConfig

185

checkpointConfig.setMinPauseBetweenCheckpoints(500)

186

checkpointConfig.setCheckpointTimeout(60000)

187

```

188

189

### Restart Strategies

190

191

Configure failure recovery behavior.

192

193

```scala { .api }

194

class StreamExecutionEnvironment {

195

/**

196

* Sets the restart strategy configuration

197

* @param restartStrategyConfiguration Restart strategy configuration

198

*/

199

def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit

200

201

/**

202

* Gets the current restart strategy

203

* @return Current restart strategy configuration

204

*/

205

def getRestartStrategy: RestartStrategyConfiguration

206

}

207

```

208

209

### Data Source Creation

210

211

Create data sources for streaming applications.

212

213

```scala { .api }

214

class StreamExecutionEnvironment {

215

/**

216

* Creates a DataStream from a sequence of elements

217

* @param data Elements to include in the stream

218

* @return DataStream containing the elements

219

*/

220

def fromElements[T: TypeInformation](data: T*): DataStream[T]

221

222

/**

223

* Creates a DataStream from a collection

224

* @param data Collection to convert to stream

225

* @return DataStream containing collection elements

226

*/

227

def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]

228

229

/**

230

* Creates a DataStream from an iterator

231

* @param data Iterator to convert to stream

232

* @return DataStream containing iterator elements

233

*/

234

def fromCollection[T: TypeInformation](data: Iterator[T]): DataStream[T]

235

236

/**

237

* Creates a DataStream from a number sequence

238

* @param from Starting number (inclusive)

239

* @param to Ending number (inclusive)

240

* @return DataStream of Long numbers

241

*/

242

def fromSequence(from: Long, to: Long): DataStream[Long]

243

244

/**

245

* Reads a text file as a DataStream

246

* @param filePath Path to the text file

247

* @return DataStream of file lines

248

*/

249

def readTextFile(filePath: String): DataStream[String]

250

251

/**

252

* Creates a socket text stream

253

* @param hostname Host to connect to

254

* @param port Port to connect to

255

* @param delimiter Line delimiter character

256

* @param maxRetry Maximum retry attempts

257

* @return DataStream of socket text lines

258

*/

259

def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0): DataStream[String]

260

261

/**

262

* Adds a custom source function

263

* @param function Source function implementation

264

* @return DataStream from the source

265

*/

266

def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]

267

268

/**

269

* Creates a DataStream from a new Source interface

270

* @param source Source implementation

271

* @param watermarkStrategy Watermark strategy for event time

272

* @param sourceName Name for the source operator

273

* @return DataStream from the source

274

*/

275

def fromSource[T: TypeInformation](

276

source: Source[T, _ <: SourceSplit, _],

277

watermarkStrategy: WatermarkStrategy[T],

278

sourceName: String

279

): DataStream[T]

280

}

281

```

282

283

### Job Execution

284

285

Execute streaming jobs and retrieve results.

286

287

```scala { .api }

288

class StreamExecutionEnvironment {

289

/**

290

* Executes the streaming job

291

* @return JobExecutionResult with execution details

292

*/

293

def execute(): JobExecutionResult

294

295

/**

296

* Executes the streaming job with a name

297

* @param jobName Name for the job

298

* @return JobExecutionResult with execution details

299

*/

300

def execute(jobName: String): JobExecutionResult

301

302

/**

303

* Executes the job asynchronously

304

* @return JobClient for monitoring the job

305

*/

306

def executeAsync(): JobClient

307

308

/**

309

* Executes the job asynchronously with a name

310

* @param jobName Name for the job

311

* @return JobClient for monitoring the job

312

*/

313

def executeAsync(jobName: String): JobClient

314

315

/**

316

* Gets the execution plan as a JSON string

317

* @return Execution plan representation

318

*/

319

def getExecutionPlan: String

320

321

/**

322

* Gets the StreamGraph representation

323

* @return StreamGraph for the defined transformations

324

*/

325

def getStreamGraph: StreamGraph

326

}

327

```

328

329

**Usage Examples:**

330

331

```scala

332

import org.apache.flink.streaming.api.scala._

333

334

val env = StreamExecutionEnvironment.getExecutionEnvironment

335

336

// Define your streaming pipeline

337

val stream = env.fromElements(1, 2, 3, 4, 5)

338

.map(_ * 2)

339

.print()

340

341

// Execute synchronously

342

val result = env.execute("My Streaming Job")

343

println(s"Job completed in ${result.getJobExecutionTime} ms")

344

345

// Or execute asynchronously

346

val jobClient = env.executeAsync("My Async Job")

347

// Monitor job status with jobClient

348

```

349

350

## Types

351

352

```scala { .api }

353

// Runtime execution modes

354

sealed trait RuntimeExecutionMode

355

object RuntimeExecutionMode {

356

case object STREAMING extends RuntimeExecutionMode

357

case object BATCH extends RuntimeExecutionMode

358

case object AUTOMATIC extends RuntimeExecutionMode

359

}

360

361

// Checkpointing modes

362

sealed trait CheckpointingMode

363

object CheckpointingMode {

364

case object EXACTLY_ONCE extends CheckpointingMode

365

case object AT_LEAST_ONCE extends CheckpointingMode

366

}

367

368

// Job execution result

369

trait JobExecutionResult {

370

def getJobExecutionTime: Long

371

def getAccumulatorResult[V](accumulatorName: String): V

372

def getAllAccumulatorResults: java.util.Map[String, AnyRef]

373

}

374

375

// Job client for async execution

376

trait JobClient {

377

def getJobID: JobID

378

def getJobStatus: CompletableFuture[JobStatus]

379

def cancel(): CompletableFuture[Void]

380

def stopWithSavepoint(advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String]

381

}

382

```