or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md

execution-environment.mddocs/

0

# Execution Environment

1

2

The ExecutionEnvironment is the main entry point for Apache Flink batch programs. It provides methods to create DataSets from various sources, control job execution parameters, and submit jobs for execution.

3

4

## Creating Execution Environments

5

6

```scala { .api }

7

object ExecutionEnvironment {

8

// Get environment based on execution context (local or cluster)

9

def getExecutionEnvironment: ExecutionEnvironment

10

11

// Create local execution environment

12

def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment

13

14

// Create environment for testing with collections

15

def createCollectionsEnvironment: ExecutionEnvironment

16

17

// Create remote execution environment

18

def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment

19

def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*): ExecutionEnvironment

20

}

21

```

22

23

## Configuration Methods

24

25

```scala { .api }

26

class ExecutionEnvironment {

27

// Get the underlying Java execution environment

28

def getJavaEnv: JavaEnv

29

30

// Get execution configuration

31

def getConfig: ExecutionConfig

32

33

// Set default parallelism for all operations

34

def setParallelism(parallelism: Int): Unit

35

def getParallelism: Int

36

37

// Configure restart strategy

38

def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit

39

def getRestartStrategy: RestartStrategyConfiguration

40

41

// Set number of execution retries (deprecated)

42

def setNumberOfExecutionRetries(numRetries: Int): Unit

43

def getNumberOfExecutionRetries: Int

44

45

// Kryo serialization registration

46

def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](

47

clazz: Class[_],

48

serializer: T

49

): Unit

50

def registerTypeWithKryoSerializer(

51

clazz: Class[_],

52

serializer: Class[_ <: Serializer[_]]

53

): Unit

54

def registerType(typeClass: Class[_]): Unit

55

56

// Cached file registration

57

def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit

58

59

// Default parallelism for local execution

60

def setDefaultLocalParallelism(parallelism: Int): Unit

61

}

62

```

63

64

## Job Management

65

66

```scala { .api }

67

class ExecutionEnvironment {

68

// Get unique job identifier

69

def getId: JobID

70

def getIdString: String

71

72

// Session management

73

def startNewSession(): Unit

74

def setSessionTimeout(timeout: Long): Unit

75

def getSessionTimeout: Long

76

77

// Execution results

78

def getLastJobExecutionResult: JobExecutionResult

79

80

// Default parallelism for local execution

81

def setDefaultLocalParallelism(parallelism: Int): Unit

82

83

// Buffer timeout configuration

84

def setBufferTimeout(timeoutMillis: Long): ExecutionEnvironment

85

def getBufferTimeout: Long

86

}

87

```

88

89

## Job Execution

90

91

```scala { .api }

92

class ExecutionEnvironment {

93

// Execute the job and return results

94

def execute(): JobExecutionResult

95

def execute(jobName: String): JobExecutionResult

96

97

// Get execution plan as string

98

def getExecutionPlan(): String

99

}

100

```

101

102

## Data Source Creation

103

104

```scala { .api }

105

class ExecutionEnvironment {

106

// Create DataSet from collections

107

def fromCollection[T: ClassTag : TypeInformation](data: Iterable[T]): DataSet[T]

108

def fromCollection[T: ClassTag : TypeInformation](data: Iterator[T]): DataSet[T]

109

def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T]

110

def fromParallelCollection[T: ClassTag : TypeInformation](data: SplittableIterator[T]): DataSet[T]

111

112

// Generate sequence of numbers

113

def generateSequence(from: Long, to: Long): DataSet[Long]

114

115

// Read from files

116

def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]

117

def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]

118

def readCsvFile[T: ClassTag : TypeInformation](filePath: String): CsvReader[T]

119

def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String): DataSet[T]

120

def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]): DataSet[T]

121

122

// Custom input formats

123

def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]

124

125

// Hadoop integration

126

def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](

127

inputFormat: MapredInputFormat[K, V],

128

keyClass: Class[K],

129

valueClass: Class[V],

130

inputPath: String

131

): DataSet[(K, V)]

132

133

def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](

134

inputFormat: MapreduceInputFormat[K, V],

135

keyClass: Class[K],

136

valueClass: Class[V],

137

inputPath: String

138

): DataSet[(K, V)]

139

}

140

```

141

142

## Usage Examples

143

144

### Basic Environment Setup

145

146

```scala

147

import org.apache.flink.api.scala._

148

149

// Get execution environment (local or cluster based on context)

150

val env = ExecutionEnvironment.getExecutionEnvironment

151

152

// Set parallelism

153

env.setParallelism(4)

154

155

// Create data and execute

156

val data = env.fromElements(1, 2, 3, 4, 5)

157

data.print()

158

159

env.execute("My Flink Job")

160

```

161

162

### Local Environment for Testing

163

164

```scala

165

import org.apache.flink.api.scala._

166

167

// Create local environment with specific parallelism

168

val localEnv = ExecutionEnvironment.createLocalEnvironment(2)

169

170

val data = localEnv.fromCollection(List("hello", "world", "flink"))

171

val result = data.map(_.toUpperCase)

172

result.print()

173

174

localEnv.execute()

175

```

176

177

### Remote Cluster Execution

178

179

```scala

180

import org.apache.flink.api.scala._

181

182

// Connect to remote Flink cluster

183

val remoteEnv = ExecutionEnvironment.createRemoteEnvironment(

184

"flink-jobmanager",

185

6123,

186

"/path/to/my-job.jar"

187

)

188

189

val data = remoteEnv.readTextFile("hdfs://data/input.txt")

190

val processed = data.flatMap(_.split(" ")).groupBy(identity).sum(1)

191

processed.writeAsText("hdfs://data/output")

192

193

remoteEnv.execute("Word Count Job")

194

```

195

196

## Accumulators

197

198

Accumulators are a simple and efficient means to aggregate values from distributed functions back to the client program.

199

200

### Accumulator Registration

201

202

```scala { .api }

203

class ExecutionEnvironment {

204

// Register accumulators

205

def addDefaultKryoSerializer[T](clazz: Class[T], serializer: Serializer[T]): Unit

206

def addDefaultKryoSerializer[T](clazz: Class[T], serializerClass: Class[_ <: Serializer[T]]): Unit

207

def registerKryoType(clazz: Class[_]): Unit

208

}

209

```

210

211

### Accumulator Types

212

213

```scala { .api }

214

// Basic accumulator types

215

trait Accumulator[V, R] {

216

def add(value: V): Unit

217

def getLocalValue: R

218

def resetLocal(): Unit

219

def merge(other: Accumulator[V, R]): Unit

220

def clone(): Accumulator[V, R]

221

}

222

223

// Built-in accumulator implementations

224

class IntCounter extends Accumulator[Int, Int]

225

class LongCounter extends Accumulator[Long, Long]

226

class DoubleCounter extends Accumulator[Double, Double]

227

class Histogram extends Accumulator[Int, java.util.Map[Int, Int]]

228

229

// List accumulator for collecting values

230

class ListAccumulator[T] extends Accumulator[T, java.util.ArrayList[T]]

231

232

// Maximum/Minimum accumulators

233

class IntMaximum extends Accumulator[Int, Int]

234

class IntMinimum extends Accumulator[Int, Int]

235

class DoubleMaximum extends Accumulator[Double, Double]

236

class DoubleMinimum extends Accumulator[Double, Double]

237

```

238

239

### Usage Examples

240

241

```scala

242

import org.apache.flink.api.scala._

243

import org.apache.flink.api.common.accumulators.{IntCounter, ListAccumulator}

244

import org.apache.flink.api.common.functions.RichMapFunction

245

import org.apache.flink.configuration.Configuration

246

247

val env = ExecutionEnvironment.getExecutionEnvironment

248

249

// Example using accumulator to count processed records

250

class CountingMapFunction extends RichMapFunction[String, String] {

251

var counter: IntCounter = _

252

var errorList: ListAccumulator[String] = _

253

254

override def open(config: Configuration): Unit = {

255

counter = new IntCounter()

256

errorList = new ListAccumulator[String]()

257

258

// Register accumulators with runtime context

259

getRuntimeContext.addAccumulator("processed-count", counter)

260

getRuntimeContext.addAccumulator("errors", errorList)

261

}

262

263

override def map(value: String): String = {

264

counter.add(1)

265

266

if (value.contains("error")) {

267

errorList.add(value)

268

return "ERROR_PROCESSED"

269

}

270

271

value.toUpperCase

272

}

273

}

274

275

val data = env.fromElements("hello", "error1", "world", "error2", "flink")

276

val result = data.map(new CountingMapFunction())

277

278

// Execute and get accumulator results

279

val jobResult = env.execute("Accumulator Example")

280

val processedCount = jobResult.getAccumulatorResult[Int]("processed-count")

281

val errors = jobResult.getAccumulatorResult[java.util.List[String]]("errors")

282

283

println(s"Processed $processedCount records")

284

println(s"Found errors: ${errors.size()}")

285

```