or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md

execution-environment.mddocs/

0

# Execution Environment

1

2

The ExecutionEnvironment is the primary entry point for Flink programs, providing the context in which jobs are executed and methods to create DataSets from various sources.

3

4

## Capabilities

5

6

### Environment Creation

7

8

Factory methods for creating execution environments with different configurations.

9

10

```scala { .api }

11

object ExecutionEnvironment {

12

/**

13

* Creates an execution environment based on context (local or remote)

14

* @return ExecutionEnvironment instance

15

*/

16

def getExecutionEnvironment: ExecutionEnvironment

17

18

/**

19

* Creates a local execution environment

20

* @param parallelism Degree of parallelism (default: number of CPU cores)

21

* @return Local ExecutionEnvironment

22

*/

23

def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment

24

25

/**

26

* Creates a local environment with web UI

27

* @param config Optional configuration

28

* @return Local ExecutionEnvironment with web interface

29

*/

30

def createLocalEnvironmentWithWebUI(config: Configuration = null): ExecutionEnvironment

31

32

/**

33

* Creates environment for remote cluster execution

34

* @param host Cluster host address

35

* @param port Cluster port

36

* @param jarFiles JAR files to distribute to cluster

37

* @return Remote ExecutionEnvironment

38

*/

39

def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment

40

41

/**

42

* Creates environment for testing with collections

43

* @return Collections-based ExecutionEnvironment

44

*/

45

def createCollectionsEnvironment: ExecutionEnvironment

46

}

47

```

48

49

**Usage Examples:**

50

51

```scala

52

import org.apache.flink.api.scala._

53

54

// Context-aware environment (local or remote based on context)

55

val env = ExecutionEnvironment.getExecutionEnvironment

56

57

// Local environment with specific parallelism

58

val localEnv = ExecutionEnvironment.createLocalEnvironment(4)

59

60

// Remote cluster environment

61

val remoteEnv = ExecutionEnvironment.createRemoteEnvironment(

62

"cluster-host", 6123, "my-job.jar"

63

)

64

```

65

66

### Environment Configuration

67

68

Methods for configuring execution parameters and behavior.

69

70

```scala { .api }

71

class ExecutionEnvironment {

72

/**

73

* Sets the parallelism for operations in this environment

74

* @param parallelism Degree of parallelism

75

*/

76

def setParallelism(parallelism: Int): Unit

77

78

/**

79

* Gets the current parallelism setting

80

* @return Current parallelism level

81

*/

82

def getParallelism: Int

83

84

/**

85

* Gets the execution configuration object

86

* @return ExecutionConfig for fine-tuning behavior

87

*/

88

def getConfig: ExecutionConfig

89

90

/**

91

* Configures the environment with settings

92

* @param configuration Configuration object

93

* @param classLoader Class loader for user code

94

*/

95

def configure(configuration: ReadableConfig, classLoader: ClassLoader): Unit

96

97

/**

98

* Sets the buffer timeout for network transfers

99

* @param timeoutMillis Timeout in milliseconds

100

*/

101

def setBufferTimeout(timeoutMillis: Long): Unit

102

103

/**

104

* Gets the current buffer timeout setting

105

* @return Current buffer timeout in milliseconds

106

*/

107

def getBufferTimeout: Long

108

109

/**

110

* Enables or disables object reuse mode for better performance

111

* @param objectReuse Whether to enable object reuse

112

*/

113

def setObjectReuse(objectReuse: Boolean): Unit

114

115

/**

116

* Gets the current object reuse setting

117

* @return True if object reuse is enabled

118

*/

119

def getObjectReuse: Boolean

120

121

/**

122

* Sets the default maximum degree of parallelism

123

* @param maxParallelism Maximum parallelism level

124

*/

125

def setMaxParallelism(maxParallelism: Int): Unit

126

127

/**

128

* Gets the default maximum degree of parallelism

129

* @return Maximum parallelism level

130

*/

131

def getMaxParallelism: Int

132

133

/**

134

* Configures the number of task slots per TaskManager

135

* @param numberOfTaskSlots Number of slots

136

*/

137

def setNumberOfExecutionRetries(numberOfTaskSlots: Int): Unit

138

139

/**

140

* Gets the number of execution retries

141

* @return Number of retries configured

142

*/

143

def getNumberOfExecutionRetries: Int

144

}

145

```

146

147

### Restart Strategy Configuration

148

149

Configure job restart behavior for fault tolerance.

150

151

```scala { .api }

152

class ExecutionEnvironment {

153

/**

154

* Sets the restart strategy for job failures

155

* @param restartStrategyConfiguration Restart strategy configuration

156

*/

157

def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit

158

159

/**

160

* Gets the current restart strategy

161

* @return Current restart strategy configuration

162

*/

163

def getRestartStrategy: RestartStrategyConfiguration

164

}

165

```

166

167

### Data Source Creation

168

169

Create DataSets from various data sources including files, collections, and custom formats.

170

171

```scala { .api }

172

class ExecutionEnvironment {

173

/**

174

* Creates DataSet from an iterable collection

175

* @param data Iterable collection of elements

176

* @return DataSet containing the collection elements

177

*/

178

def fromCollection[T: TypeInformation: ClassTag](data: Iterable[T]): DataSet[T]

179

180

/**

181

* Creates DataSet from individual elements

182

* @param data Variable arguments of elements

183

* @return DataSet containing the elements

184

*/

185

def fromElements[T: TypeInformation: ClassTag](data: T*): DataSet[T]

186

187

/**

188

* Creates DataSet from a parallel collection

189

* @param iterator Splittable iterator for parallel processing

190

* @return DataSet from parallel collection

191

*/

192

def fromParallelCollection[T: TypeInformation: ClassTag](iterator: SplittableIterator[T]): DataSet[T]

193

194

/**

195

* Generates a sequence of numbers

196

* @param from Starting number (inclusive)

197

* @param to Ending number (inclusive)

198

* @return DataSet containing the number sequence

199

*/

200

def generateSequence(from: Long, to: Long): DataSet[Long]

201

}

202

```

203

204

### File Input Operations

205

206

Read data from various file formats and sources.

207

208

```scala { .api }

209

class ExecutionEnvironment {

210

/**

211

* Reads a text file as DataSet of strings

212

* @param filePath Path to the text file

213

* @param charsetName Character encoding (default: UTF-8)

214

* @return DataSet of text lines

215

*/

216

def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]

217

218

/**

219

* Reads a text file with default value for empty files

220

* @param filePath Path to the text file

221

* @param defaultValue Default value if file is empty

222

* @param charsetName Character encoding (default: UTF-8)

223

* @return DataSet of text lines with fallback

224

*/

225

def readTextFileWithValue(filePath: String, defaultValue: String, charsetName: String = "UTF-8"): DataSet[String]

226

227

/**

228

* Reads a text file with collection fallback for empty files

229

* @param filePath Path to the text file

230

* @param defaultValues Collection of default values if file is empty

231

* @param charsetName Character encoding (default: UTF-8)

232

* @return DataSet of text lines with collection fallback

233

*/

234

def readTextFileWithValue(filePath: String, defaultValues: Iterable[String], charsetName: String = "UTF-8"): DataSet[String]

235

236

/**

237

* Reads a CSV file into typed DataSet

238

* @param filePath Path to the CSV file

239

* @return DataSet of parsed CSV records

240

*/

241

def readCsvFile[T: ClassTag: TypeInformation](filePath: String): DataSet[T]

242

243

/**

244

* Reads primitive values from a file

245

* @param filePath Path to the file

246

* @param delimiter Value delimiter

247

* @return DataSet of primitive values

248

*/

249

def readFileOfPrimitives[T: ClassTag: TypeInformation](filePath: String, delimiter: String = "\n"): DataSet[T]

250

251

/**

252

* Reads primitive values from a file with default value fallback

253

* @param filePath Path to the file

254

* @param defaultValue Default value if file is empty

255

* @param delimiter Value delimiter

256

* @return DataSet of primitive values with fallback

257

*/

258

def readFileOfPrimitives[T: ClassTag: TypeInformation](filePath: String, defaultValue: T, delimiter: String = "\n"): DataSet[T]

259

260

/**

261

* Reads primitive values from a file with collection fallback

262

* @param filePath Path to the file

263

* @param defaultValues Collection of default values if file is empty

264

* @param delimiter Value delimiter

265

* @return DataSet of primitive values with collection fallback

266

*/

267

def readFileOfPrimitives[T: ClassTag: TypeInformation](filePath: String, defaultValues: Iterable[T], delimiter: String = "\n"): DataSet[T]

268

269

/**

270

* Reads Hadoop SequenceFile format

271

* @param keyClass Class type for keys

272

* @param valueClass Class type for values

273

* @param filePath Path to the sequence file

274

* @return DataSet of key-value pairs

275

*/

276

def readSequenceFile[K: ClassTag: TypeInformation, V: ClassTag: TypeInformation](

277

keyClass: Class[K],

278

valueClass: Class[V],

279

filePath: String

280

): DataSet[(K, V)]

281

282

/**

283

* Reads Hadoop SequenceFile with Writables

284

* @param keyClass Writable key class

285

* @param valueClass Writable value class

286

* @param filePath Path to the sequence file

287

* @return DataSet of Writable key-value pairs

288

*/

289

def readHadoopFile[K <: Writable: ClassTag: TypeInformation, V <: Writable: ClassTag: TypeInformation](

290

keyClass: Class[K],

291

valueClass: Class[V],

292

filePath: String

293

): DataSet[(K, V)]

294

295

/**

296

* Reads file using custom input format

297

* @param inputFormat Custom file input format

298

* @param filePath Path to the file

299

* @return DataSet with custom format parsing

300

*/

301

def readFile[T: ClassTag: TypeInformation](inputFormat: FileInputFormat[T], filePath: String): DataSet[T]

302

303

/**

304

* Creates DataSet from custom input format

305

* @param inputFormat Custom input format implementation

306

* @return DataSet using custom input

307

*/

308

def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]

309

310

/**

311

* Creates DataSet from custom input format with generic parameters

312

* @param inputFormat Custom input format with type parameters

313

* @param typeInformation Type information for result type

314

* @return DataSet using custom input with type safety

315

*/

316

def createInput[T](inputFormat: InputFormat[T, _ <: InputSplit])(implicit typeInfo: TypeInformation[T]): DataSet[T]

317

318

/**

319

* Creates DataSet from Hadoop input format

320

* @param hadoopInputFormat Hadoop InputFormat class

321

* @param keyClass Key type class

322

* @param valueClass Value type class

323

* @param job Hadoop job configuration

324

* @return DataSet from Hadoop source

325

*/

326

def createHadoopInput[K, V](

327

hadoopInputFormat: HadoopInputFormat[K, V],

328

keyClass: Class[K],

329

valueClass: Class[V],

330

job: Job

331

): DataSet[(K, V)]

332

}

333

```

334

335

### Serialization Configuration

336

337

Configure Kryo serialization and type registration for custom types.

338

339

```scala { .api }

340

class ExecutionEnvironment {

341

/**

342

* Registers a type with a Kryo serializer instance

343

* @param clazz Class to register

344

* @param serializer Serializer instance

345

*/

346

def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](

347

clazz: Class[_],

348

serializer: T

349

): Unit

350

351

/**

352

* Registers a type with a Kryo serializer class

353

* @param clazz Class to register

354

* @param serializer Serializer class

355

*/

356

def registerTypeWithKryoSerializer(

357

clazz: Class[_],

358

serializer: Class[_ <: Serializer[_]]

359

): Unit

360

361

/**

362

* Adds a default Kryo serializer for a type

363

* @param clazz Class to register

364

* @param serializer Serializer class

365

*/

366

def addDefaultKryoSerializer(

367

clazz: Class[_],

368

serializer: Class[_ <: Serializer[_]]

369

): Unit

370

371

/**

372

* Adds a default Kryo serializer instance for a type

373

* @param clazz Class to register

374

* @param serializer Serializer instance

375

*/

376

def addDefaultKryoSerializer[T <: Serializer[_] with Serializable](

377

clazz: Class[_],

378

serializer: T

379

): Unit

380

381

/**

382

* Registers a type with Kryo (uses default serialization)

383

* @param typeClass Class to register

384

*/

385

def registerType(typeClass: Class[_]): Unit

386

387

/**

388

* Registers multiple types with Kryo

389

* @param types Classes to register

390

*/

391

def registerType(types: Class[_]*): Unit

392

393

/**

394

* Enables/disables force Kryo serialization for all types

395

* @param forceKryo Whether to force Kryo for all serialization

396

*/

397

def setForceKryo(forceKryo: Boolean): Unit

398

399

/**

400

* Enables/disables force Avro serialization for generic types

401

* @param forceAvro Whether to force Avro serialization

402

*/

403

def setForceAvro(forceAvro: Boolean): Unit

404

405

/**

406

* Disables auto type registration with Kryo

407

*/

408

def disableAutoTypeRegistration(): Unit

409

410

/**

411

* Enables auto type registration with Kryo

412

*/

413

def enableAutoTypeRegistration(): Unit

414

415

/**

416

* Gets whether auto type registration is enabled

417

* @return True if auto registration is enabled

418

*/

419

def hasAutoTypeRegistrationEnabled: Boolean

420

}

421

```

422

423

### Job Execution

424

425

Execute jobs and retrieve results, with support for both synchronous and asynchronous execution.

426

427

```scala { .api }

428

class ExecutionEnvironment {

429

/**

430

* Executes the job and waits for completion

431

* @return JobExecutionResult with execution statistics

432

*/

433

def execute(): JobExecutionResult

434

435

/**

436

* Executes the job with a custom name

437

* @param jobName Name for the job

438

* @return JobExecutionResult with execution statistics

439

*/

440

def execute(jobName: String): JobExecutionResult

441

442

/**

443

* Executes the job asynchronously

444

* @return JobClient for monitoring execution

445

*/

446

def executeAsync(): JobClient

447

448

/**

449

* Executes the job asynchronously with a custom name

450

* @param jobName Name for the job

451

* @return JobClient for monitoring execution

452

*/

453

def executeAsync(jobName: String): JobClient

454

455

/**

456

* Gets the result of the last job execution

457

* @return JobExecutionResult from last execution

458

*/

459

def getLastJobExecutionResult: JobExecutionResult

460

}

461

```

462

463

### Job Listeners and Monitoring

464

465

Register listeners for job lifecycle events.

466

467

```scala { .api }

468

class ExecutionEnvironment {

469

/**

470

* Registers a job listener for execution events

471

* @param jobListener Listener for job events

472

*/

473

def registerJobListener(jobListener: JobListener): Unit

474

475

/**

476

* Clears all registered job listeners

477

*/

478

def clearJobListeners(): Unit

479

}

480

```

481

482

### Program Planning

483

484

Access to execution plan generation and optimization details.

485

486

```scala { .api }

487

class ExecutionEnvironment {

488

/**

489

* Gets the execution plan as JSON string

490

* @return Execution plan in JSON format

491

*/

492

def getExecutionPlan(): String

493

494

/**

495

* Creates a program plan for optimization

496

* @param jobName Optional job name

497

* @return Program plan object

498

*/

499

def createProgramPlan(jobName: String = ""): Plan

500

}

501

```

502

503

### Distributed Cache

504

505

Register files for distribution to all cluster nodes.

506

507

```scala { .api }

508

class ExecutionEnvironment {

509

/**

510

* Registers a file in the distributed cache

511

* @param filePath Path to the file

512

* @param name Name for accessing the cached file

513

* @param executable Whether the file should be executable

514

*/

515

def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit

516

}

517

```

518

519

### Utility Methods

520

521

Additional utility methods for DataSet operations.

522

523

```scala { .api }

524

class ExecutionEnvironment {

525

/**

526

* Creates union of multiple DataSets

527

* @param sets Sequence of DataSets to union

528

* @return Unified DataSet

529

*/

530

def union[T](sets: Seq[DataSet[T]]): DataSet[T]

531

}

532

```

533

534

## Types

535

536

```scala { .api }

537

trait RestartStrategyConfiguration

538

539

class JobExecutionResult {

540

def getJobID: JobID

541

def getNetRuntime: Long

542

def getNetRuntime(timeUnit: TimeUnit): Long

543

def getAllAccumulatorResults: java.util.Map[String, Object]

544

def getAccumulatorResult[T](accumulatorName: String): T

545

}

546

547

trait JobClient {

548

def getJobID: JobID

549

def cancel(): CompletableFuture[Void]

550

def stopWithSavepoint(advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String]

551

}

552

553

trait JobListener {

554

def onJobSubmitted(jobClient: JobClient, t: Throwable): Unit

555

def onJobExecuted(jobExecutionResult: JobExecutionResult, t: Throwable): Unit

556

}

557

558

abstract class Plan {

559

def getJobName: String

560

def setJobName(jobName: String): Unit

561

}

562

```