or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

data-sources-sinks.mdexecution-environment.mdgrouping-aggregation.mdhadoop-integration.mdindex.mditerations.mdjoins-cogroups.mdtransformations.mdtype-system.md
tile.json

execution-environment.mddocs/

Execution Environment

The ExecutionEnvironment is the main entry point for Apache Flink batch programs. It provides methods to create DataSets from various sources, control job execution parameters, and submit jobs for execution.

Creating Execution Environments

object ExecutionEnvironment {
  // Get environment based on execution context (local or cluster)
  def getExecutionEnvironment: ExecutionEnvironment
  
  // Create local execution environment
  def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
  
  // Create environment for testing with collections
  def createCollectionsEnvironment: ExecutionEnvironment
  
  // Create remote execution environment  
  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
  def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*): ExecutionEnvironment
}

Configuration Methods

class ExecutionEnvironment {
  // Get the underlying Java execution environment
  def getJavaEnv: JavaEnv
  
  // Get execution configuration  
  def getConfig: ExecutionConfig
  
  // Set default parallelism for all operations
  def setParallelism(parallelism: Int): Unit
  def getParallelism: Int
  
  // Configure restart strategy
  def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit  
  def getRestartStrategy: RestartStrategyConfiguration
  
  // Set number of execution retries (deprecated)
  def setNumberOfExecutionRetries(numRetries: Int): Unit
  def getNumberOfExecutionRetries: Int
  
  // Kryo serialization registration
  def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
    clazz: Class[_], 
    serializer: T
  ): Unit
  def registerTypeWithKryoSerializer(
    clazz: Class[_], 
    serializer: Class[_ <: Serializer[_]]
  ): Unit
  def registerType(typeClass: Class[_]): Unit
  
  // Cached file registration
  def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit
  
  // Default parallelism for local execution
  def setDefaultLocalParallelism(parallelism: Int): Unit
}

Job Management

class ExecutionEnvironment {
  // Get unique job identifier
  def getId: JobID
  def getIdString: String
  
  // Session management
  def startNewSession(): Unit
  def setSessionTimeout(timeout: Long): Unit  
  def getSessionTimeout: Long
  
  // Execution results
  def getLastJobExecutionResult: JobExecutionResult
  
  // Default parallelism for local execution
  def setDefaultLocalParallelism(parallelism: Int): Unit
  
  // Buffer timeout configuration 
  def setBufferTimeout(timeoutMillis: Long): ExecutionEnvironment
  def getBufferTimeout: Long
}

Job Execution

class ExecutionEnvironment {
  // Execute the job and return results
  def execute(): JobExecutionResult
  def execute(jobName: String): JobExecutionResult
  
  // Get execution plan as string
  def getExecutionPlan(): String
}

Data Source Creation

class ExecutionEnvironment {
  // Create DataSet from collections
  def fromCollection[T: ClassTag : TypeInformation](data: Iterable[T]): DataSet[T]
  def fromCollection[T: ClassTag : TypeInformation](data: Iterator[T]): DataSet[T]
  def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T]
  def fromParallelCollection[T: ClassTag : TypeInformation](data: SplittableIterator[T]): DataSet[T]
  
  // Generate sequence of numbers
  def generateSequence(from: Long, to: Long): DataSet[Long]
  
  // Read from files
  def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
  def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
  def readCsvFile[T: ClassTag : TypeInformation](filePath: String): CsvReader[T]
  def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String): DataSet[T]
  def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]): DataSet[T]
  
  // Custom input formats
  def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
  
  // Hadoop integration
  def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
    inputFormat: MapredInputFormat[K, V], 
    keyClass: Class[K], 
    valueClass: Class[V], 
    inputPath: String
  ): DataSet[(K, V)]
  
  def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
    inputFormat: MapreduceInputFormat[K, V], 
    keyClass: Class[K], 
    valueClass: Class[V], 
    inputPath: String
  ): DataSet[(K, V)]
}

Usage Examples

Basic Environment Setup

import org.apache.flink.api.scala._

// Get execution environment (local or cluster based on context)
val env = ExecutionEnvironment.getExecutionEnvironment

// Set parallelism
env.setParallelism(4)

// Create data and execute
val data = env.fromElements(1, 2, 3, 4, 5)
data.print()

env.execute("My Flink Job")

Local Environment for Testing

import org.apache.flink.api.scala._

// Create local environment with specific parallelism
val localEnv = ExecutionEnvironment.createLocalEnvironment(2)

val data = localEnv.fromCollection(List("hello", "world", "flink"))
val result = data.map(_.toUpperCase)
result.print()

localEnv.execute()

Remote Cluster Execution

import org.apache.flink.api.scala._

// Connect to remote Flink cluster
val remoteEnv = ExecutionEnvironment.createRemoteEnvironment(
  "flink-jobmanager", 
  6123, 
  "/path/to/my-job.jar"
)

val data = remoteEnv.readTextFile("hdfs://data/input.txt")
val processed = data.flatMap(_.split(" ")).groupBy(identity).sum(1)
processed.writeAsText("hdfs://data/output")

remoteEnv.execute("Word Count Job")

Accumulators

Accumulators are a simple and efficient means to aggregate values from distributed functions back to the client program.

Accumulator Registration

class ExecutionEnvironment {
  // Register accumulators 
  def addDefaultKryoSerializer[T](clazz: Class[T], serializer: Serializer[T]): Unit
  def addDefaultKryoSerializer[T](clazz: Class[T], serializerClass: Class[_ <: Serializer[T]]): Unit
  def registerKryoType(clazz: Class[_]): Unit
}

Accumulator Types

// Basic accumulator types
trait Accumulator[V, R] {
  def add(value: V): Unit
  def getLocalValue: R
  def resetLocal(): Unit
  def merge(other: Accumulator[V, R]): Unit
  def clone(): Accumulator[V, R]
}

// Built-in accumulator implementations
class IntCounter extends Accumulator[Int, Int]
class LongCounter extends Accumulator[Long, Long] 
class DoubleCounter extends Accumulator[Double, Double]
class Histogram extends Accumulator[Int, java.util.Map[Int, Int]]

// List accumulator for collecting values
class ListAccumulator[T] extends Accumulator[T, java.util.ArrayList[T]]

// Maximum/Minimum accumulators
class IntMaximum extends Accumulator[Int, Int]
class IntMinimum extends Accumulator[Int, Int]
class DoubleMaximum extends Accumulator[Double, Double] 
class DoubleMinimum extends Accumulator[Double, Double]

Usage Examples

import org.apache.flink.api.scala._
import org.apache.flink.api.common.accumulators.{IntCounter, ListAccumulator}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration

val env = ExecutionEnvironment.getExecutionEnvironment

// Example using accumulator to count processed records
class CountingMapFunction extends RichMapFunction[String, String] {
  var counter: IntCounter = _
  var errorList: ListAccumulator[String] = _
  
  override def open(config: Configuration): Unit = {
    counter = new IntCounter()
    errorList = new ListAccumulator[String]()
    
    // Register accumulators with runtime context
    getRuntimeContext.addAccumulator("processed-count", counter)
    getRuntimeContext.addAccumulator("errors", errorList)
  }
  
  override def map(value: String): String = {
    counter.add(1)
    
    if (value.contains("error")) {
      errorList.add(value)
      return "ERROR_PROCESSED"
    }
    
    value.toUpperCase
  }
}

val data = env.fromElements("hello", "error1", "world", "error2", "flink")
val result = data.map(new CountingMapFunction())

// Execute and get accumulator results
val jobResult = env.execute("Accumulator Example")
val processedCount = jobResult.getAccumulatorResult[Int]("processed-count")
val errors = jobResult.getAccumulatorResult[java.util.List[String]]("errors")

println(s"Processed $processedCount records")
println(s"Found errors: ${errors.size()}")