The ExecutionEnvironment is the entry point for all Flink Scala programs. It provides the context for creating DataSets and configuring execution parameters.
object ExecutionEnvironment {
def getExecutionEnvironment: ExecutionEnvironment
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
def createCollectionsEnvironment: ExecutionEnvironment
def createLocalEnvironmentWithWebUI(config: Configuration): ExecutionEnvironment
}import org.apache.flink.api.scala._
// Get default execution environment (local or cluster based on context)
val env = ExecutionEnvironment.getExecutionEnvironment
// Create local environment with specific parallelism
val localEnv = ExecutionEnvironment.createLocalEnvironment(4)
// Create remote environment
val remoteEnv = ExecutionEnvironment.createRemoteEnvironment(
host = "jobmanager",
port = 8081,
jarFiles = "myapp.jar"
)
// Create collections-based environment (for testing)
val collEnv = ExecutionEnvironment.createCollectionsEnvironmentclass ExecutionEnvironment {
def setParallelism(parallelism: Int): Unit
def getParallelism: Int
def getConfig: ExecutionConfig
def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
def registerJobListener(jobListener: JobListener): Unit
def configure(configuration: Configuration, classLoader: ClassLoader): Unit
}val env = ExecutionEnvironment.getExecutionEnvironment
// Set parallelism
env.setParallelism(8)
// Configure execution settings
val config = env.getConfig
config.enableClosureCleaner()
config.setGlobalJobParameters(ParameterTool.fromArgs(args))
// Set restart strategy
import org.apache.flink.api.common.restartstrategy.RestartStrategies
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))class ExecutionEnvironment {
def fromElements[T: ClassTag: TypeInformation](data: T*): DataSet[T]
def fromCollection[T: ClassTag: TypeInformation](data: Iterable[T]): DataSet[T]
def fromCollection[T: ClassTag: TypeInformation](
data: Iterator[T],
tpe: TypeInformation[T]
): DataSet[T]
}// From individual elements
val numbers = env.fromElements(1, 2, 3, 4, 5)
val words = env.fromElements("Hello", "World", "Flink")
// From collections
val list = List("A", "B", "C")
val dataSet = env.fromCollection(list)
// From iterator with explicit type information
val iterator = Iterator(("Alice", 25), ("Bob", 30))
val people = env.fromCollection(iterator, Types.TUPLE[(String, Int)])class ExecutionEnvironment {
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
def readCsvFile[T: ClassTag: TypeInformation](
filePath: String,
lineDelimiter: String = "\n",
fieldDelimiter: String = ",",
ignoreFirstLine: Boolean = false,
lenient: Boolean = false,
includedFields: Array[Int] = null
): DataSet[T]
def readFileOfPrimitives[T: ClassTag: TypeInformation](
filePath: String,
delimiter: String,
tpe: Class[T]
): DataSet[T]
def readFile[T: ClassTag: TypeInformation](
inputFormat: FileInputFormat[T],
filePath: String
): DataSet[T]
def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
}// Read text file
val lines = env.readTextFile("path/to/input.txt")
// Read CSV file as tuples
val csvData = env.readCsvFile[(String, Int, Double)](
filePath = "data.csv",
ignoreFirstLine = true,
fieldDelimiter = ","
)
// Read file of primitives (numbers, strings, etc.)
val numbers = env.readFileOfPrimitives("numbers.txt", "\n", classOf[Int])
val strings = env.readFileOfPrimitives("words.txt", " ", classOf[String])
// Read with custom file input format
val customData = env.readFile(new MyInputFormat(), "path/to/data")
// Create DataSet from generic input format
val inputFormat = new MyCustomInputFormat[MyType]()
val dataSet = env.createInput(inputFormat)class ExecutionEnvironment {
def generateSequence(from: Long, to: Long): DataSet[Long]
def fromParallelCollection[T: ClassTag: TypeInformation](
c: SplittableIterator[T]
): DataSet[T]
}// Generate sequence of numbers
val sequence = env.generateSequence(1, 1000000)
// Create from parallel collection
val parallelData = env.fromParallelCollection(new NumberSequenceIterator(1L, 100L))class ExecutionEnvironment {
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
def executeAsync(): JobClient
def executeAsync(jobName: String): JobClient
def getExecutionPlan: String
def getLastJobExecutionResult: JobExecutionResult
}val env = ExecutionEnvironment.getExecutionEnvironment
// Create and transform data
val result = env.fromElements(1, 2, 3, 4, 5)
.map(_ * 2)
.filter(_ > 5)
// Print results (triggers execution)
result.print()
// Execute explicitly with job name
val jobResult = env.execute("My Flink Job")
println(s"Job took ${jobResult.getJobExecutionTime} ms")
// Execute asynchronously
val jobClient = env.executeAsync("Async Job")
val completableFuture = jobClient.getJobExecutionResultval config = env.getConfig
// Register types with Kryo
config.registerKryoType(classOf[MyClass])
config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[MyClassSerializer])
// Add default Kryo serializers
config.addDefaultKryoSerializer(classOf[LocalDateTime], classOf[LocalDateTimeSerializer])val config = env.getConfig
// Enable/disable closure cleaner (default: enabled)
config.enableClosureCleaner()
config.disableClosureCleaner()class ExecutionEnvironment {
def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit
}// Register cached files for distributed access
env.registerCachedFile("hdfs://path/to/data.txt", "data")
env.registerCachedFile("s3://bucket/executable.sh", "script", executable = true)
// Access cached files in user functions via RuntimeContext
class MyMapFunction extends RichMapFunction[String, String] {
override def map(value: String): String = {
val cachedFile = getRuntimeContext.getDistributedCache.getFile("data")
// Use cached file...
value
}
}import org.apache.flink.api.java.utils.ParameterTool
val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
// Access in transformation functions
class MyMapFunction extends MapFunction[String, String] {
override def map(value: String): String = {
val params = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
.asInstanceOf[ParameterTool]
val prefix = params.get("prefix", "default")
s"$prefix: $value"
}
}class ExecutionConfig {
def setParallelism(parallelism: Int): ExecutionConfig
def getParallelism: Int
def enableClosureCleaner(): ExecutionConfig
def disableClosureCleaner(): ExecutionConfig
def isClosureCleanerEnabled: Boolean
def setGlobalJobParameters(globalJobParameters: Configuration): ExecutionConfig
def getGlobalJobParameters: Configuration
def registerKryoType(tpe: Class[_]): ExecutionConfig
def registerTypeWithKryoSerializer[T](tpe: Class[T], serializer: Class[_ <: Serializer[T]]): ExecutionConfig
def addDefaultKryoSerializer[T](tpe: Class[T], serializer: Class[_ <: Serializer[T]]): ExecutionConfig
}
class JobExecutionResult {
def getJobExecutionTime: Long
def getAccumulatorResult[T](accumulatorName: String): T
def getAllAccumulatorResults: java.util.Map[String, Object]
}
trait JobClient {
def getJobExecutionResult: CompletableFuture[JobExecutionResult]
def getJobStatus: CompletableFuture[JobStatus]
def cancel(): CompletableFuture[Void]
def stopWithSavepoint(advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String]
}