or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

dataset-operations.mdexecution-environment.mdgrouped-dataset-operations.mdindex.mdjoin-operations.mdtype-system.mdutility-functions.md
tile.json

execution-environment.mddocs/

Execution Environment

The ExecutionEnvironment is the entry point for all Flink Scala programs. It provides the context for creating DataSets and configuring execution parameters.

Creating Execution Environments

object ExecutionEnvironment {
  def getExecutionEnvironment: ExecutionEnvironment
  def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
  def createCollectionsEnvironment: ExecutionEnvironment
  def createLocalEnvironmentWithWebUI(config: Configuration): ExecutionEnvironment
}

Usage Examples

import org.apache.flink.api.scala._

// Get default execution environment (local or cluster based on context)
val env = ExecutionEnvironment.getExecutionEnvironment

// Create local environment with specific parallelism
val localEnv = ExecutionEnvironment.createLocalEnvironment(4)

// Create remote environment
val remoteEnv = ExecutionEnvironment.createRemoteEnvironment(
  host = "jobmanager", 
  port = 8081, 
  jarFiles = "myapp.jar"
)

// Create collections-based environment (for testing)
val collEnv = ExecutionEnvironment.createCollectionsEnvironment

Configuration

class ExecutionEnvironment {
  def setParallelism(parallelism: Int): Unit
  def getParallelism: Int
  def getConfig: ExecutionConfig
  def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
  def registerJobListener(jobListener: JobListener): Unit
  def configure(configuration: Configuration, classLoader: ClassLoader): Unit
}

Configuration Examples

val env = ExecutionEnvironment.getExecutionEnvironment

// Set parallelism
env.setParallelism(8)

// Configure execution settings
val config = env.getConfig
config.enableClosureCleaner()
config.setGlobalJobParameters(ParameterTool.fromArgs(args))

// Set restart strategy
import org.apache.flink.api.common.restartstrategy.RestartStrategies
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))

Data Source Creation

From Collections and Elements

class ExecutionEnvironment {
  def fromElements[T: ClassTag: TypeInformation](data: T*): DataSet[T]
  def fromCollection[T: ClassTag: TypeInformation](data: Iterable[T]): DataSet[T]
  def fromCollection[T: ClassTag: TypeInformation](
    data: Iterator[T], 
    tpe: TypeInformation[T]
  ): DataSet[T]
}
// From individual elements
val numbers = env.fromElements(1, 2, 3, 4, 5)
val words = env.fromElements("Hello", "World", "Flink")

// From collections
val list = List("A", "B", "C")
val dataSet = env.fromCollection(list)

// From iterator with explicit type information
val iterator = Iterator(("Alice", 25), ("Bob", 30))
val people = env.fromCollection(iterator, Types.TUPLE[(String, Int)])

From Files

class ExecutionEnvironment {
  def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
  def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
  def readCsvFile[T: ClassTag: TypeInformation](
    filePath: String,
    lineDelimiter: String = "\n",
    fieldDelimiter: String = ",",
    ignoreFirstLine: Boolean = false,
    lenient: Boolean = false,
    includedFields: Array[Int] = null
  ): DataSet[T]
  def readFileOfPrimitives[T: ClassTag: TypeInformation](
    filePath: String,
    delimiter: String,
    tpe: Class[T]
  ): DataSet[T]
  def readFile[T: ClassTag: TypeInformation](
    inputFormat: FileInputFormat[T], 
    filePath: String
  ): DataSet[T]
  def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
}
// Read text file
val lines = env.readTextFile("path/to/input.txt")

// Read CSV file as tuples
val csvData = env.readCsvFile[(String, Int, Double)](
  filePath = "data.csv",
  ignoreFirstLine = true,
  fieldDelimiter = ","
)

// Read file of primitives (numbers, strings, etc.)
val numbers = env.readFileOfPrimitives("numbers.txt", "\n", classOf[Int])
val strings = env.readFileOfPrimitives("words.txt", " ", classOf[String])

// Read with custom file input format
val customData = env.readFile(new MyInputFormat(), "path/to/data")

// Create DataSet from generic input format
val inputFormat = new MyCustomInputFormat[MyType]()
val dataSet = env.createInput(inputFormat)

Sequence Generation

class ExecutionEnvironment {
  def generateSequence(from: Long, to: Long): DataSet[Long]
  def fromParallelCollection[T: ClassTag: TypeInformation](
    c: SplittableIterator[T]
  ): DataSet[T]
}
// Generate sequence of numbers
val sequence = env.generateSequence(1, 1000000)

// Create from parallel collection
val parallelData = env.fromParallelCollection(new NumberSequenceIterator(1L, 100L))

Execution

class ExecutionEnvironment {
  def execute(): JobExecutionResult
  def execute(jobName: String): JobExecutionResult
  def executeAsync(): JobClient
  def executeAsync(jobName: String): JobClient
  def getExecutionPlan: String
  def getLastJobExecutionResult: JobExecutionResult
}

Execution Examples

val env = ExecutionEnvironment.getExecutionEnvironment

// Create and transform data
val result = env.fromElements(1, 2, 3, 4, 5)
  .map(_ * 2)
  .filter(_ > 5)

// Print results (triggers execution)
result.print()

// Execute explicitly with job name
val jobResult = env.execute("My Flink Job")
println(s"Job took ${jobResult.getJobExecutionTime} ms")

// Execute asynchronously
val jobClient = env.executeAsync("Async Job")
val completableFuture = jobClient.getJobExecutionResult

Advanced Configuration

Kryo Serialization

val config = env.getConfig

// Register types with Kryo
config.registerKryoType(classOf[MyClass])
config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[MyClassSerializer])

// Add default Kryo serializers
config.addDefaultKryoSerializer(classOf[LocalDateTime], classOf[LocalDateTimeSerializer])

Closure Cleaner

val config = env.getConfig

// Enable/disable closure cleaner (default: enabled)
config.enableClosureCleaner()
config.disableClosureCleaner()

Cached Files

class ExecutionEnvironment {
  def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit
}
// Register cached files for distributed access
env.registerCachedFile("hdfs://path/to/data.txt", "data")
env.registerCachedFile("s3://bucket/executable.sh", "script", executable = true)

// Access cached files in user functions via RuntimeContext
class MyMapFunction extends RichMapFunction[String, String] {
  override def map(value: String): String = {
    val cachedFile = getRuntimeContext.getDistributedCache.getFile("data")
    // Use cached file...
    value
  }
}

Global Job Parameters

import org.apache.flink.api.java.utils.ParameterTool

val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)

// Access in transformation functions
class MyMapFunction extends MapFunction[String, String] {
  override def map(value: String): String = {
    val params = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
      .asInstanceOf[ParameterTool]
    val prefix = params.get("prefix", "default")
    s"$prefix: $value"
  }
}

Types

class ExecutionConfig {
  def setParallelism(parallelism: Int): ExecutionConfig
  def getParallelism: Int
  def enableClosureCleaner(): ExecutionConfig
  def disableClosureCleaner(): ExecutionConfig
  def isClosureCleanerEnabled: Boolean
  def setGlobalJobParameters(globalJobParameters: Configuration): ExecutionConfig
  def getGlobalJobParameters: Configuration
  def registerKryoType(tpe: Class[_]): ExecutionConfig
  def registerTypeWithKryoSerializer[T](tpe: Class[T], serializer: Class[_ <: Serializer[T]]): ExecutionConfig
  def addDefaultKryoSerializer[T](tpe: Class[T], serializer: Class[_ <: Serializer[T]]): ExecutionConfig
}

class JobExecutionResult {
  def getJobExecutionTime: Long
  def getAccumulatorResult[T](accumulatorName: String): T
  def getAllAccumulatorResults: java.util.Map[String, Object]
}

trait JobClient {
  def getJobExecutionResult: CompletableFuture[JobExecutionResult]
  def getJobStatus: CompletableFuture[JobStatus]
  def cancel(): CompletableFuture[Void]
  def stopWithSavepoint(advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String]
}