The ExecutionEnvironment is the main entry point for Apache Flink batch programs. It provides methods to create DataSets from various sources, control job execution parameters, and submit jobs for execution.
object ExecutionEnvironment {
// Get environment based on execution context (local or cluster)
def getExecutionEnvironment: ExecutionEnvironment
// Create local execution environment
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
// Create environment for testing with collections
def createCollectionsEnvironment: ExecutionEnvironment
// Create remote execution environment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*): ExecutionEnvironment
}class ExecutionEnvironment {
// Get the underlying Java execution environment
def getJavaEnv: JavaEnv
// Get execution configuration
def getConfig: ExecutionConfig
// Set default parallelism for all operations
def setParallelism(parallelism: Int): Unit
def getParallelism: Int
// Configure restart strategy
def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
def getRestartStrategy: RestartStrategyConfiguration
// Set number of execution retries (deprecated)
def setNumberOfExecutionRetries(numRetries: Int): Unit
def getNumberOfExecutionRetries: Int
// Kryo serialization registration
def registerTypeWithKryoSerializer[T <: Serializer[_] with Serializable](
clazz: Class[_],
serializer: T
): Unit
def registerTypeWithKryoSerializer(
clazz: Class[_],
serializer: Class[_ <: Serializer[_]]
): Unit
def registerType(typeClass: Class[_]): Unit
// Cached file registration
def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit
// Default parallelism for local execution
def setDefaultLocalParallelism(parallelism: Int): Unit
}class ExecutionEnvironment {
// Get unique job identifier
def getId: JobID
def getIdString: String
// Session management
def startNewSession(): Unit
def setSessionTimeout(timeout: Long): Unit
def getSessionTimeout: Long
// Execution results
def getLastJobExecutionResult: JobExecutionResult
// Default parallelism for local execution
def setDefaultLocalParallelism(parallelism: Int): Unit
// Buffer timeout configuration
def setBufferTimeout(timeoutMillis: Long): ExecutionEnvironment
def getBufferTimeout: Long
}class ExecutionEnvironment {
// Execute the job and return results
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
// Get execution plan as string
def getExecutionPlan(): String
}class ExecutionEnvironment {
// Create DataSet from collections
def fromCollection[T: ClassTag : TypeInformation](data: Iterable[T]): DataSet[T]
def fromCollection[T: ClassTag : TypeInformation](data: Iterator[T]): DataSet[T]
def fromElements[T: ClassTag : TypeInformation](data: T*): DataSet[T]
def fromParallelCollection[T: ClassTag : TypeInformation](data: SplittableIterator[T]): DataSet[T]
// Generate sequence of numbers
def generateSequence(from: Long, to: Long): DataSet[Long]
// Read from files
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
def readCsvFile[T: ClassTag : TypeInformation](filePath: String): CsvReader[T]
def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, delimiter: String): DataSet[T]
def readFileOfPrimitives[T: ClassTag : TypeInformation](filePath: String, typeClass: Class[T]): DataSet[T]
// Custom input formats
def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
// Hadoop integration
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
inputFormat: MapredInputFormat[K, V],
keyClass: Class[K],
valueClass: Class[V],
inputPath: String
): DataSet[(K, V)]
def readHadoopFile[K: ClassTag : TypeInformation, V: ClassTag : TypeInformation](
inputFormat: MapreduceInputFormat[K, V],
keyClass: Class[K],
valueClass: Class[V],
inputPath: String
): DataSet[(K, V)]
}import org.apache.flink.api.scala._
// Get execution environment (local or cluster based on context)
val env = ExecutionEnvironment.getExecutionEnvironment
// Set parallelism
env.setParallelism(4)
// Create data and execute
val data = env.fromElements(1, 2, 3, 4, 5)
data.print()
env.execute("My Flink Job")import org.apache.flink.api.scala._
// Create local environment with specific parallelism
val localEnv = ExecutionEnvironment.createLocalEnvironment(2)
val data = localEnv.fromCollection(List("hello", "world", "flink"))
val result = data.map(_.toUpperCase)
result.print()
localEnv.execute()import org.apache.flink.api.scala._
// Connect to remote Flink cluster
val remoteEnv = ExecutionEnvironment.createRemoteEnvironment(
"flink-jobmanager",
6123,
"/path/to/my-job.jar"
)
val data = remoteEnv.readTextFile("hdfs://data/input.txt")
val processed = data.flatMap(_.split(" ")).groupBy(identity).sum(1)
processed.writeAsText("hdfs://data/output")
remoteEnv.execute("Word Count Job")Accumulators are a simple and efficient means to aggregate values from distributed functions back to the client program.
class ExecutionEnvironment {
// Register accumulators
def addDefaultKryoSerializer[T](clazz: Class[T], serializer: Serializer[T]): Unit
def addDefaultKryoSerializer[T](clazz: Class[T], serializerClass: Class[_ <: Serializer[T]]): Unit
def registerKryoType(clazz: Class[_]): Unit
}// Basic accumulator types
trait Accumulator[V, R] {
def add(value: V): Unit
def getLocalValue: R
def resetLocal(): Unit
def merge(other: Accumulator[V, R]): Unit
def clone(): Accumulator[V, R]
}
// Built-in accumulator implementations
class IntCounter extends Accumulator[Int, Int]
class LongCounter extends Accumulator[Long, Long]
class DoubleCounter extends Accumulator[Double, Double]
class Histogram extends Accumulator[Int, java.util.Map[Int, Int]]
// List accumulator for collecting values
class ListAccumulator[T] extends Accumulator[T, java.util.ArrayList[T]]
// Maximum/Minimum accumulators
class IntMaximum extends Accumulator[Int, Int]
class IntMinimum extends Accumulator[Int, Int]
class DoubleMaximum extends Accumulator[Double, Double]
class DoubleMinimum extends Accumulator[Double, Double]import org.apache.flink.api.scala._
import org.apache.flink.api.common.accumulators.{IntCounter, ListAccumulator}
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
val env = ExecutionEnvironment.getExecutionEnvironment
// Example using accumulator to count processed records
class CountingMapFunction extends RichMapFunction[String, String] {
var counter: IntCounter = _
var errorList: ListAccumulator[String] = _
override def open(config: Configuration): Unit = {
counter = new IntCounter()
errorList = new ListAccumulator[String]()
// Register accumulators with runtime context
getRuntimeContext.addAccumulator("processed-count", counter)
getRuntimeContext.addAccumulator("errors", errorList)
}
override def map(value: String): String = {
counter.add(1)
if (value.contains("error")) {
errorList.add(value)
return "ERROR_PROCESSED"
}
value.toUpperCase
}
}
val data = env.fromElements("hello", "error1", "world", "error2", "flink")
val result = data.map(new CountingMapFunction())
// Execute and get accumulator results
val jobResult = env.execute("Accumulator Example")
val processedCount = jobResult.getAccumulatorResult[Int]("processed-count")
val errors = jobResult.getAccumulatorResult[java.util.List[String]]("errors")
println(s"Processed $processedCount records")
println(s"Found errors: ${errors.size()}")