Apache Flink Scala API providing type-safe operations and functional programming paradigms for distributed stream and batch processing applications.
The ExecutionEnvironment is the entry point for all Flink Scala programs. It provides the context for creating DataSets and configuring execution parameters.
object ExecutionEnvironment {
def getExecutionEnvironment: ExecutionEnvironment
def createLocalEnvironment(parallelism: Int = Runtime.getRuntime.availableProcessors()): ExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment
def createCollectionsEnvironment: ExecutionEnvironment
def createLocalEnvironmentWithWebUI(config: Configuration): ExecutionEnvironment
}import org.apache.flink.api.scala._
// Get default execution environment (local or cluster based on context)
val env = ExecutionEnvironment.getExecutionEnvironment
// Create local environment with specific parallelism
val localEnv = ExecutionEnvironment.createLocalEnvironment(4)
// Create remote environment
val remoteEnv = ExecutionEnvironment.createRemoteEnvironment(
host = "jobmanager",
port = 8081,
jarFiles = "myapp.jar"
)
// Create collections-based environment (for testing)
val collEnv = ExecutionEnvironment.createCollectionsEnvironmentclass ExecutionEnvironment {
def setParallelism(parallelism: Int): Unit
def getParallelism: Int
def getConfig: ExecutionConfig
def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
def registerJobListener(jobListener: JobListener): Unit
def configure(configuration: Configuration, classLoader: ClassLoader): Unit
}val env = ExecutionEnvironment.getExecutionEnvironment
// Set parallelism
env.setParallelism(8)
// Configure execution settings
val config = env.getConfig
config.enableClosureCleaner()
config.setGlobalJobParameters(ParameterTool.fromArgs(args))
// Set restart strategy
import org.apache.flink.api.common.restartstrategy.RestartStrategies
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))class ExecutionEnvironment {
def fromElements[T: ClassTag: TypeInformation](data: T*): DataSet[T]
def fromCollection[T: ClassTag: TypeInformation](data: Iterable[T]): DataSet[T]
def fromCollection[T: ClassTag: TypeInformation](
data: Iterator[T],
tpe: TypeInformation[T]
): DataSet[T]
}// From individual elements
val numbers = env.fromElements(1, 2, 3, 4, 5)
val words = env.fromElements("Hello", "World", "Flink")
// From collections
val list = List("A", "B", "C")
val dataSet = env.fromCollection(list)
// From iterator with explicit type information
val iterator = Iterator(("Alice", 25), ("Bob", 30))
val people = env.fromCollection(iterator, Types.TUPLE[(String, Int)])class ExecutionEnvironment {
def readTextFile(filePath: String, charsetName: String = "UTF-8"): DataSet[String]
def readTextFileWithValue(filePath: String, charsetName: String = "UTF-8"): DataSet[StringValue]
def readCsvFile[T: ClassTag: TypeInformation](
filePath: String,
lineDelimiter: String = "\n",
fieldDelimiter: String = ",",
ignoreFirstLine: Boolean = false,
lenient: Boolean = false,
includedFields: Array[Int] = null
): DataSet[T]
def readFileOfPrimitives[T: ClassTag: TypeInformation](
filePath: String,
delimiter: String,
tpe: Class[T]
): DataSet[T]
def readFile[T: ClassTag: TypeInformation](
inputFormat: FileInputFormat[T],
filePath: String
): DataSet[T]
def createInput[T: ClassTag: TypeInformation](inputFormat: InputFormat[T, _]): DataSet[T]
}// Read text file
val lines = env.readTextFile("path/to/input.txt")
// Read CSV file as tuples
val csvData = env.readCsvFile[(String, Int, Double)](
filePath = "data.csv",
ignoreFirstLine = true,
fieldDelimiter = ","
)
// Read file of primitives (numbers, strings, etc.)
val numbers = env.readFileOfPrimitives("numbers.txt", "\n", classOf[Int])
val strings = env.readFileOfPrimitives("words.txt", " ", classOf[String])
// Read with custom file input format
val customData = env.readFile(new MyInputFormat(), "path/to/data")
// Create DataSet from generic input format
val inputFormat = new MyCustomInputFormat[MyType]()
val dataSet = env.createInput(inputFormat)class ExecutionEnvironment {
def generateSequence(from: Long, to: Long): DataSet[Long]
def fromParallelCollection[T: ClassTag: TypeInformation](
c: SplittableIterator[T]
): DataSet[T]
}// Generate sequence of numbers
val sequence = env.generateSequence(1, 1000000)
// Create from parallel collection
val parallelData = env.fromParallelCollection(new NumberSequenceIterator(1L, 100L))class ExecutionEnvironment {
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
def executeAsync(): JobClient
def executeAsync(jobName: String): JobClient
def getExecutionPlan: String
def getLastJobExecutionResult: JobExecutionResult
}val env = ExecutionEnvironment.getExecutionEnvironment
// Create and transform data
val result = env.fromElements(1, 2, 3, 4, 5)
.map(_ * 2)
.filter(_ > 5)
// Print results (triggers execution)
result.print()
// Execute explicitly with job name
val jobResult = env.execute("My Flink Job")
println(s"Job took ${jobResult.getJobExecutionTime} ms")
// Execute asynchronously
val jobClient = env.executeAsync("Async Job")
val completableFuture = jobClient.getJobExecutionResultval config = env.getConfig
// Register types with Kryo
config.registerKryoType(classOf[MyClass])
config.registerTypeWithKryoSerializer(classOf[MyClass], classOf[MyClassSerializer])
// Add default Kryo serializers
config.addDefaultKryoSerializer(classOf[LocalDateTime], classOf[LocalDateTimeSerializer])val config = env.getConfig
// Enable/disable closure cleaner (default: enabled)
config.enableClosureCleaner()
config.disableClosureCleaner()class ExecutionEnvironment {
def registerCachedFile(filePath: String, name: String, executable: Boolean = false): Unit
}// Register cached files for distributed access
env.registerCachedFile("hdfs://path/to/data.txt", "data")
env.registerCachedFile("s3://bucket/executable.sh", "script", executable = true)
// Access cached files in user functions via RuntimeContext
class MyMapFunction extends RichMapFunction[String, String] {
override def map(value: String): String = {
val cachedFile = getRuntimeContext.getDistributedCache.getFile("data")
// Use cached file...
value
}
}import org.apache.flink.api.java.utils.ParameterTool
val params = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
// Access in transformation functions
class MyMapFunction extends MapFunction[String, String] {
override def map(value: String): String = {
val params = getRuntimeContext.getExecutionConfig.getGlobalJobParameters
.asInstanceOf[ParameterTool]
val prefix = params.get("prefix", "default")
s"$prefix: $value"
}
}class ExecutionConfig {
def setParallelism(parallelism: Int): ExecutionConfig
def getParallelism: Int
def enableClosureCleaner(): ExecutionConfig
def disableClosureCleaner(): ExecutionConfig
def isClosureCleanerEnabled: Boolean
def setGlobalJobParameters(globalJobParameters: Configuration): ExecutionConfig
def getGlobalJobParameters: Configuration
def registerKryoType(tpe: Class[_]): ExecutionConfig
def registerTypeWithKryoSerializer[T](tpe: Class[T], serializer: Class[_ <: Serializer[T]]): ExecutionConfig
def addDefaultKryoSerializer[T](tpe: Class[T], serializer: Class[_ <: Serializer[T]]): ExecutionConfig
}
class JobExecutionResult {
def getJobExecutionTime: Long
def getAccumulatorResult[T](accumulatorName: String): T
def getAllAccumulatorResults: java.util.Map[String, Object]
}
trait JobClient {
def getJobExecutionResult: CompletableFuture[JobExecutionResult]
def getJobStatus: CompletableFuture[JobStatus]
def cancel(): CompletableFuture[Void]
def stopWithSavepoint(advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String]
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-scala-2-12