or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dataset-operations.mdexecution-environment.mdextensions.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md

execution-environment.mddocs/

0

# Execution Environment

1

2

The ExecutionEnvironment is the entry point for all Flink Scala programs. It provides the context for creating DataSets and configuring execution parameters.

3

4

## Creating Execution Environments

5

6

```scala { .api }

7

object ExecutionEnvironment {

8

def getExecutionEnvironment: ExecutionEnvironment

9

def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment

10

def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment

11

def createCollectionsEnvironment: ExecutionEnvironment

12

def createLocalEnvironmentWithWebUI(config: Configuration): ExecutionEnvironment

13

}

14

```

15

16

### Usage Examples

17

18

```scala

19

import org.apache.flink.api.scala._

20

21

// Get default execution environment (local or cluster based on context)

22

val env = ExecutionEnvironment.getExecutionEnvironment

23

24

// Create local environment with specific parallelism

25

val localEnv = ExecutionEnvironment.createLocalEnvironment(4)

26

27

// Create remote environment

28

val remoteEnv = ExecutionEnvironment.createRemoteEnvironment(

29

host = "jobmanager",

30

port = 8081,

31

jarFiles = "myapp.jar"

32

)

33

34

// Create collections-based environment (for testing)

35

val collEnv = ExecutionEnvironment.createCollectionsEnvironment

36

```

37

38

## Configuration

39

40

```scala { .api }

41

class ExecutionEnvironment {

42

def setParallelism(parallelism: Int): Unit

43

def getParallelism: Int

44

def getConfig: ExecutionConfig

45

def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit

46

def registerJobListener(jobListener: JobListener): Unit

47

def configure(configuration: Configuration, classLoader: ClassLoader): Unit

48

}

49

```

50

51

### Configuration Examples

52

53

```scala

54

val env = ExecutionEnvironment.getExecutionEnvironment

55

56

// Set parallelism

57

env.setParallelism(8)

58

59

// Configure execution settings

60

val config = env.getConfig

61

config.enableClosureCleaner()

62

config.setGlobalJobParameters(ParameterTool.fromArgs(args))

63

64

// Set restart strategy

65

import org.apache.flink.api.common.restartstrategy.RestartStrategies

66

env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))

67

```

68

69

## Data Source Creation

70

71

### From Collections and Elements

72

73

```scala { .api }

74

class ExecutionEnvironment {

75

def fromElements[T: ClassTag: TypeInformation](data: T*): DataSet[T]

76

def fromCollection[T: ClassTag: TypeInformation](data: Iterable[T]): DataSet[T]

77

def fromCollection[T: ClassTag: TypeInformation](

78

data: Iterator[T],

79

tpe: TypeInformation[T]

80

): DataSet[T]

81

}

82

```

83

84

```scala

85

// From individual elements

86

val numbers = env.fromElements(1, 2, 3, 4, 5)

87

val words = env.fromElements("Hello", "World", "Flink")

88

89

// From collections

90

val list = List("A", "B", "C")

91

val dataSet = env.fromCollection(list)

92

93

// From iterator with explicit type information

94

val iterator = Iterator(("Alice", 25), ("Bob", 30))

95

val people = env.fromCollection(iterator, Types.TUPLE[(String, Int)])

96

```

97

98

### From Files

99

100

```scala { .api }

101

class ExecutionEnvironment {

102

def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]

103

def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]

104

def readCsvFile[T: ClassTag: TypeInformation](

105

filePath: String,

106

lineDelimiter: String = "\n",

107

fieldDelimiter: String = ",",

108

ignoreFirstLine: Boolean = false,

109

lenient: Boolean = false,

110

includedFields: Array[Int] = null

111

): DataSet[T]

112

def readFileOfPrimitives[T: ClassTag: TypeInformation](

113

filePath: String,

114

delimiter: String,

115

tpe: Class[T]

116

): DataSet[T]

117

def readFile[T: ClassTag: TypeInformation](

118

inputFormat: FileInputFormat[T],

119

filePath: String

120

): DataSet[T]

121

def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]

122

}

123

```

124

125

```scala

126

// Read text file

127

val lines = env.readTextFile("path/to/input.txt")

128

129

// Read CSV file as tuples

130

val csvData = env.readCsvFile[(String, Int, Double)](

131

filePath = "data.csv",

132

ignoreFirstLine = true,

133

fieldDelimiter = ","

134

)

135

136

// Read file of primitives (numbers, strings, etc.)

137

val numbers = env.readFileOfPrimitives("numbers.txt", "\n", classOf[Int])

138

val strings = env.readFileOfPrimitives("words.txt", " ", classOf[String])

139

140

// Read with custom file input format

141

val customData = env.readFile(new MyInputFormat(), "path/to/data")

142

143

// Create DataSet from generic input format

144

val inputFormat = new MyCustomInputFormat[MyType]()

145

val dataSet = env.createInput(inputFormat)

146

```

147

148

### Sequence Generation

149

150

```scala { .api }

151

class ExecutionEnvironment {

152

def generateSequence(from: Long, to: Long): DataSet[Long]

153

def fromParallelCollection[T: ClassTag: TypeInformation](

154

c: SplittableIterator[T]

155

): DataSet[T]

156

}

157

```

158

159

```scala

160

// Generate sequence of numbers

161

val sequence = env.generateSequence(1, 1000000)

162

163

// Create from parallel collection

164

val parallelData = env.fromParallelCollection(new NumberSequenceIterator(1L, 100L))

165

```

166

167

## Execution

168

169

```scala { .api }

170

class ExecutionEnvironment {

171

def execute(): JobExecutionResult

172

def execute(jobName: String): JobExecutionResult

173

def executeAsync(): JobClient

174

def executeAsync(jobName: String): JobClient

175

def getExecutionPlan: String

176

def getLastJobExecutionResult: JobExecutionResult

177

}

178

```

179

180

### Execution Examples

181

182

```scala

183

val env = ExecutionEnvironment.getExecutionEnvironment

184

185

// Create and transform data

186

val result = env.fromElements(1, 2, 3, 4, 5)

187

.map(_ * 2)

188

.filter(_ > 5)

189

190

// Print results (triggers execution)

191

result.print()

192

193

// Execute explicitly with job name

194

val jobResult = env.execute("My Flink Job")

195

println(s"Job took ${jobResult.getJobExecutionTime} ms")

196

197

// Execute asynchronously

198

val jobClient = env.executeAsync("Async Job")

199

val completableFuture = jobClient.getJobExecutionResult

200

```

201

202

## Advanced Configuration

203

204

### Kryo Serialization

205

206

```scala

207

val config = env.getConfig

208

209

// Register types with Kryo

210

config.registerKryoType(classOf[MyClass])

211

config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[MyClassSerializer])

212

213

// Add default Kryo serializers

214

config.addDefaultKryoSerializer(classOf[LocalDateTime], classOf[LocalDateTimeSerializer])

215

```

216

217

### Closure Cleaner

218

219

```scala

220

val config = env.getConfig

221

222

// Enable/disable closure cleaner (default: enabled)

223

config.enableClosureCleaner()

224

config.disableClosureCleaner()

225

```

226

227

### Cached Files

228

229

```scala { .api }

230

class ExecutionEnvironment {

231

def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit

232

}

233

```

234

235

```scala

236

// Register cached files for distributed access

237

env.registerCachedFile("hdfs://path/to/data.txt", "data")

238

env.registerCachedFile("s3://bucket/executable.sh", "script", executable = true)

239

240

// Access cached files in user functions via RuntimeContext

241

class MyMapFunction extends RichMapFunction[String, String] {

242

override def map(value: String): String = {

243

val cachedFile = getRuntimeContext.getDistributedCache.getFile("data")

244

// Use cached file...

245

value

246

}

247

}

248

```

249

250

### Global Job Parameters

251

252

```scala

253

import org.apache.flink.api.java.utils.ParameterTool

254

255

val params = ParameterTool.fromArgs(args)

256

env.getConfig.setGlobalJobParameters(params)

257

258

// Access in transformation functions

259

class MyMapFunction extends MapFunction[String, String] {

260

override def map(value: String): String = {

261

val params = getRuntimeContext.getExecutionConfig.getGlobalJobParameters

262

.asInstanceOf[ParameterTool]

263

val prefix = params.get("prefix", "default")

264

s"$prefix: $value"

265

}

266

}

267

```

268

269

## Types

270

271

```scala { .api }

272

class ExecutionConfig {

273

def setParallelism(parallelism: Int): ExecutionConfig

274

def getParallelism: Int

275

def enableClosureCleaner(): ExecutionConfig

276

def disableClosureCleaner(): ExecutionConfig

277

def isClosureCleanerEnabled: Boolean

278

def setGlobalJobParameters(globalJobParameters: Configuration): ExecutionConfig

279

def getGlobalJobParameters: Configuration

280

def registerKryoType(tpe: Class[_]): ExecutionConfig

281

def registerTypeWithKryoSerializer[T](tpe: Class[T], serializer: Class[_ <: Serializer[T]]): ExecutionConfig

282

def addDefaultKryoSerializer[T](tpe: Class[T], serializer: Class[_ <: Serializer[T]]): ExecutionConfig

283

}

284

285

class JobExecutionResult {

286

def getJobExecutionTime: Long

287

def getAccumulatorResult[T](accumulatorName: String): T

288

def getAllAccumulatorResults: java.util.Map[String, Object]

289

}

290

291

trait JobClient {

292

def getJobExecutionResult: CompletableFuture[JobExecutionResult]

293

def getJobStatus: CompletableFuture[JobStatus]

294

def cancel(): CompletableFuture[Void]

295

def stopWithSavepoint(advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String]

296

}

297

```