or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

binary-operations.mddata-transformations.mdexecution-environment.mdgrouping-aggregation.mdindex.mdinput-output.mdpartitioning-distribution.mdtype-system.mdutilities.md
tile.json

execution-environment.mddocs/

Execution Environment

The ExecutionEnvironment is the primary entry point for Flink programs, providing the context in which jobs are executed and methods to create DataSets from various sources.

Capabilities

Environment Creation

Factory methods for creating execution environments with different configurations.

object ExecutionEnvironment {
  /**
   * Creates an execution environment based on context (local or remote)
   * @return ExecutionEnvironment instance
   */
  def getExecutionEnvironment: ExecutionEnvironment
  
  /**
   * Creates a local execution environment
   * @param parallelism Degree of parallelism (default: number of CPU cores)
   * @return Local ExecutionEnvironment
   */
  def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
  
  /**
   * Creates a local environment with web UI
   * @param config Optional configuration
   * @return Local ExecutionEnvironment with web interface
   */
  def createLocalEnvironmentWithWebUI(config: Configuration = null): ExecutionEnvironment
  
  /**
   * Creates environment for remote cluster execution
   * @param host Cluster host address
   * @param port Cluster port
   * @param jarFiles JAR files to distribute to cluster
   * @return Remote ExecutionEnvironment
   */
  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
  
  /**
   * Creates environment for testing with collections
   * @return Collections-based ExecutionEnvironment
   */
  def createCollectionsEnvironment: ExecutionEnvironment
}

Usage Examples:

import org.apache.flink.api.scala._

// Context-aware environment (local or remote based on context)
val env = ExecutionEnvironment.getExecutionEnvironment

// Local environment with specific parallelism
val localEnv = ExecutionEnvironment.createLocalEnvironment(4)

// Remote cluster environment
val remoteEnv = ExecutionEnvironment.createRemoteEnvironment(
  "cluster-host", 6123, "my-job.jar"
)

Environment Configuration

Methods for configuring execution parameters and behavior.

class ExecutionEnvironment {
  /**
   * Sets the parallelism for operations in this environment
   * @param parallelism Degree of parallelism
   */
  def setParallelism(parallelism: Int): Unit
  
  /**
   * Gets the current parallelism setting
   * @return Current parallelism level
   */
  def getParallelism: Int
  
  /**
   * Gets the execution configuration object
   * @return ExecutionConfig for fine-tuning behavior
   */
  def getConfig: ExecutionConfig
  
  /**
   * Configures the environment with settings
   * @param configuration Configuration object
   * @param classLoader Class loader for user code
   */
  def configure(configuration: ReadableConfig, classLoader: ClassLoader): Unit
  
  /**
   * Sets the buffer timeout for network transfers
   * @param timeoutMillis Timeout in milliseconds
   */
  def setBufferTimeout(timeoutMillis: Long): Unit
  
  /**
   * Gets the current buffer timeout setting
   * @return Current buffer timeout in milliseconds
   */
  def getBufferTimeout: Long
  
  /**
   * Enables or disables object reuse mode for better performance
   * @param objectReuse Whether to enable object reuse
   */
  def setObjectReuse(objectReuse: Boolean): Unit
  
  /**
   * Gets the current object reuse setting
   * @return True if object reuse is enabled
   */
  def getObjectReuse: Boolean
  
  /**
   * Sets the default maximum degree of parallelism
   * @param maxParallelism Maximum parallelism level
   */
  def setMaxParallelism(maxParallelism: Int): Unit
  
  /**
   * Gets the default maximum degree of parallelism
   * @return Maximum parallelism level
   */
  def getMaxParallelism: Int
  
  /**
   * Configures the number of task slots per TaskManager
   * @param numberOfTaskSlots Number of slots
   */
  def setNumberOfExecutionRetries(numberOfTaskSlots: Int): Unit
  
  /**
   * Gets the number of execution retries
   * @return Number of retries configured
   */
  def getNumberOfExecutionRetries: Int
}

Restart Strategy Configuration

Configure job restart behavior for fault tolerance.

class ExecutionEnvironment {
  /**
   * Sets the restart strategy for job failures
   * @param restartStrategyConfiguration Restart strategy configuration
   */
  def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
  
  /**
   * Gets the current restart strategy
   * @return Current restart strategy configuration
   */
  def getRestartStrategy: RestartStrategyConfiguration
}

Data Source Creation

Create DataSets from various data sources including files, collections, and custom formats.

class ExecutionEnvironment {
  /**
   * Creates DataSet from an iterable collection
   * @param data Iterable collection of elements
   * @return DataSet containing the collection elements
   */
  def fromCollection[T: TypeInformation: ClassTag](data: Iterable[T]): DataSet[T]
  
  /**
   * Creates DataSet from individual elements
   * @param data Variable arguments of elements
   * @return DataSet containing the elements
   */
  def fromElements[T: TypeInformation: ClassTag](data: T*): DataSet[T]
  
  /**
   * Creates DataSet from a parallel collection
   * @param iterator Splittable iterator for parallel processing
   * @return DataSet from parallel collection
   */
  def fromParallelCollection[T: TypeInformation: ClassTag](iterator: SplittableIterator[T]): DataSet[T]
  
  /**
   * Generates a sequence of numbers
   * @param from Starting number (inclusive)
   * @param to Ending number (inclusive)
   * @return DataSet containing the number sequence
   */
  def generateSequence(from: Long, to: Long): DataSet[Long]
}

File Input Operations

Read data from various file formats and sources.

class ExecutionEnvironment {
  /**
   * Reads a text file as DataSet of strings
   * @param filePath Path to the text file
   * @param charsetName Character encoding (default: UTF-8)
   * @return DataSet of text lines
   */
  def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
  
  /**
   * Reads a text file with default value for empty files
   * @param filePath Path to the text file
   * @param defaultValue Default value if file is empty
   * @param charsetName Character encoding (default: UTF-8)
   * @return DataSet of text lines with fallback
   */
  def readTextFileWithValue(filePath: String, defaultValue: String, charsetName: String = "UTF-8"): DataSet[String]
  
  /**
   * Reads a text file with collection fallback for empty files
   * @param filePath Path to the text file
   * @param defaultValues Collection of default values if file is empty
   * @param charsetName Character encoding (default: UTF-8)
   * @return DataSet of text lines with collection fallback
   */
  def readTextFileWithValue(filePath: String, defaultValues: Iterable[String], charsetName: String = "UTF-8"): DataSet[String]
  
  /**
   * Reads a CSV file into typed DataSet
   * @param filePath Path to the CSV file
   * @return DataSet of parsed CSV records
   */
  def readCsvFile[T: ClassTag: TypeInformation](filePath: String): DataSet[T]
  
  /**
   * Reads primitive values from a file
   * @param filePath Path to the file
   * @param delimiter Value delimiter
   * @return DataSet of primitive values
   */
  def readFileOfPrimitives[T: ClassTag: TypeInformation](filePath: String, delimiter: String = "\n"): DataSet[T]
  
  /**
   * Reads primitive values from a file with default value fallback
   * @param filePath Path to the file
   * @param defaultValue Default value if file is empty
   * @param delimiter Value delimiter
   * @return DataSet of primitive values with fallback
   */
  def readFileOfPrimitives[T: ClassTag: TypeInformation](filePath: String, defaultValue: T, delimiter: String = "\n"): DataSet[T]
  
  /**
   * Reads primitive values from a file with collection fallback
   * @param filePath Path to the file
   * @param defaultValues Collection of default values if file is empty
   * @param delimiter Value delimiter
   * @return DataSet of primitive values with collection fallback
   */
  def readFileOfPrimitives[T: ClassTag: TypeInformation](filePath: String, defaultValues: Iterable[T], delimiter: String = "\n"): DataSet[T]
  
  /**
   * Reads Hadoop SequenceFile format
   * @param keyClass Class type for keys
   * @param valueClass Class type for values
   * @param filePath Path to the sequence file
   * @return DataSet of key-value pairs
   */
  def readSequenceFile[K: ClassTag: TypeInformation, V: ClassTag: TypeInformation](
    keyClass: Class[K], 
    valueClass: Class[V], 
    filePath: String
  ): DataSet[(K, V)]
  
  /**
   * Reads Hadoop SequenceFile with Writables
   * @param keyClass Writable key class
   * @param valueClass Writable value class  
   * @param filePath Path to the sequence file
   * @return DataSet of Writable key-value pairs
   */
  def readHadoopFile[K <: Writable: ClassTag: TypeInformation, V <: Writable: ClassTag: TypeInformation](
    keyClass: Class[K],
    valueClass: Class[V], 
    filePath: String
  ): DataSet[(K, V)]
  
  /**
   * Reads file using custom input format
   * @param inputFormat Custom file input format
   * @param filePath Path to the file
   * @return DataSet with custom format parsing
   */
  def readFile[T: ClassTag: TypeInformation](inputFormat: FileInputFormat[T], filePath: String): DataSet[T]
  
  /**
   * Creates DataSet from custom input format
   * @param inputFormat Custom input format implementation
   * @return DataSet using custom input
   */
  def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
  
  /**
   * Creates DataSet from custom input format with generic parameters
   * @param inputFormat Custom input format with type parameters
   * @param typeInformation Type information for result type
   * @return DataSet using custom input with type safety
   */
  def createInput[T](inputFormat: InputFormat[T, _ <: InputSplit])(implicit typeInfo: TypeInformation[T]): DataSet[T]
  
  /**
   * Creates DataSet from Hadoop input format
   * @param hadoopInputFormat Hadoop InputFormat class
   * @param keyClass Key type class
   * @param valueClass Value type class
   * @param job Hadoop job configuration
   * @return DataSet from Hadoop source
   */
  def createHadoopInput[K, V](
    hadoopInputFormat: HadoopInputFormat[K, V],
    keyClass: Class[K],
    valueClass: Class[V],
    job: Job
  ): DataSet[(K, V)]
}

Serialization Configuration

Configure Kryo serialization and type registration for custom types.

class ExecutionEnvironment {
  /**
   * Registers a type with a Kryo serializer instance
   * @param clazz Class to register
   * @param serializer Serializer instance
   */
  def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
    clazz: Class[_], 
    serializer: T
  ): Unit
  
  /**
   * Registers a type with a Kryo serializer class
   * @param clazz Class to register
   * @param serializer Serializer class
   */
  def registerTypeWithKryoSerializer(
    clazz: Class[_], 
    serializer: Class[_ <: Serializer[_]]
  ): Unit
  
  /**
   * Adds a default Kryo serializer for a type
   * @param clazz Class to register
   * @param serializer Serializer class
   */
  def addDefaultKryoSerializer(
    clazz: Class[_], 
    serializer: Class[_ <: Serializer[_]]
  ): Unit
  
  /**
   * Adds a default Kryo serializer instance for a type
   * @param clazz Class to register
   * @param serializer Serializer instance
   */
  def addDefaultKryoSerializer[T <: Serializer[_] with Serializable](
    clazz: Class[_], 
    serializer: T
  ): Unit
  
  /**
   * Registers a type with Kryo (uses default serialization)
   * @param typeClass Class to register
   */
  def registerType(typeClass: Class[_]): Unit
  
  /**
   * Registers multiple types with Kryo
   * @param types Classes to register
   */
  def registerType(types: Class[_]*): Unit
  
  /**
   * Enables/disables force Kryo serialization for all types
   * @param forceKryo Whether to force Kryo for all serialization
   */
  def setForceKryo(forceKryo: Boolean): Unit
  
  /**
   * Enables/disables force Avro serialization for generic types
   * @param forceAvro Whether to force Avro serialization
   */
  def setForceAvro(forceAvro: Boolean): Unit
  
  /**
   * Disables auto type registration with Kryo
   */
  def disableAutoTypeRegistration(): Unit
  
  /**
   * Enables auto type registration with Kryo
   */
  def enableAutoTypeRegistration(): Unit
  
  /**
   * Gets whether auto type registration is enabled
   * @return True if auto registration is enabled
   */
  def hasAutoTypeRegistrationEnabled: Boolean
}

Job Execution

Execute jobs and retrieve results, with support for both synchronous and asynchronous execution.

class ExecutionEnvironment {
  /**
   * Executes the job and waits for completion
   * @return JobExecutionResult with execution statistics
   */
  def execute(): JobExecutionResult
  
  /**
   * Executes the job with a custom name
   * @param jobName Name for the job
   * @return JobExecutionResult with execution statistics
   */
  def execute(jobName: String): JobExecutionResult
  
  /**
   * Executes the job asynchronously
   * @return JobClient for monitoring execution
   */
  def executeAsync(): JobClient
  
  /**
   * Executes the job asynchronously with a custom name
   * @param jobName Name for the job
   * @return JobClient for monitoring execution
   */
  def executeAsync(jobName: String): JobClient
  
  /**
   * Gets the result of the last job execution
   * @return JobExecutionResult from last execution
   */
  def getLastJobExecutionResult: JobExecutionResult
}

Job Listeners and Monitoring

Register listeners for job lifecycle events.

class ExecutionEnvironment {
  /**
   * Registers a job listener for execution events
   * @param jobListener Listener for job events
   */
  def registerJobListener(jobListener: JobListener): Unit
  
  /**
   * Clears all registered job listeners
   */
  def clearJobListeners(): Unit
}

Program Planning

Access to execution plan generation and optimization details.

class ExecutionEnvironment {
  /**
   * Gets the execution plan as JSON string
   * @return Execution plan in JSON format
   */
  def getExecutionPlan(): String
  
  /**
   * Creates a program plan for optimization
   * @param jobName Optional job name
   * @return Program plan object
   */
  def createProgramPlan(jobName: String = ""): Plan
}

Distributed Cache

Register files for distribution to all cluster nodes.

class ExecutionEnvironment {
  /**
   * Registers a file in the distributed cache
   * @param filePath Path to the file
   * @param name Name for accessing the cached file
   * @param executable Whether the file should be executable
   */
  def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit
}

Utility Methods

Additional utility methods for DataSet operations.

class ExecutionEnvironment {
  /**
   * Creates union of multiple DataSets
   * @param sets Sequence of DataSets to union
   * @return Unified DataSet
   */
  def union[T](sets: Seq[DataSet[T]]): DataSet[T]
}

Types

trait RestartStrategyConfiguration

class JobExecutionResult {
  def getJobID: JobID
  def getNetRuntime: Long
  def getNetRuntime(timeUnit: TimeUnit): Long
  def getAllAccumulatorResults: java.util.Map[String, Object]
  def getAccumulatorResult[T](accumulatorName: String): T
}

trait JobClient {
  def getJobID: JobID
  def cancel(): CompletableFuture[Void]
  def stopWithSavepoint(advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String]
}

trait JobListener {
  def onJobSubmitted(jobClient: JobClient, t: Throwable): Unit
  def onJobExecuted(jobExecutionResult: JobExecutionResult, t: Throwable): Unit
}

abstract class Plan {
  def getJobName: String
  def setJobName(jobName: String): Unit
}