Apache Flink Scala API for DataStream processing with type-safe, functional programming constructs for building streaming data processing applications.
The StreamExecutionEnvironment is the main entry point for all Flink streaming programs. It provides the context for creating data streams and configuring the runtime behavior of streaming applications.
object StreamExecutionEnvironment {
def getExecutionEnvironment: StreamExecutionEnvironment
def createLocalEnvironment(): StreamExecutionEnvironment
def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment
def createLocalEnvironmentWithWebUI(config: Configuration): StreamExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
def setDefaultLocalParallelism(parallelism: Int): Unit
def getDefaultLocalParallelism: Int
}Usage Examples:
import org.apache.flink.streaming.api.scala._
// Get context-appropriate environment (local or cluster)
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create local environment with specific parallelism
val localEnv = StreamExecutionEnvironment.createLocalEnvironment(4)
// Create remote cluster environment
val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
"localhost", 8081, "/path/to/your-program.jar"
)class StreamExecutionEnvironment {
def setParallelism(parallelism: Int): Unit
def getParallelism: Int
def setMaxParallelism(maxParallelism: Int): Unit
def getMaxParallelism: Int
}class StreamExecutionEnvironment {
def setBufferTimeout(timeoutMillis: Long): Unit
def getBufferTimeout: Long
def disableOperatorChaining(): Unit
}class StreamExecutionEnvironment {
def setStreamTimeCharacteristic(characteristic: TimeCharacteristic): Unit
def getStreamTimeCharacteristic: TimeCharacteristic
}Usage Examples:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Set parallelism for all operators
env.setParallelism(4)
// Set buffer timeout for low-latency processing
env.setBufferTimeout(0) // Flush after every record
// Configure time characteristic for event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)class StreamExecutionEnvironment {
def enableCheckpointing(interval: Long): Unit
def enableCheckpointing(interval: Long, mode: CheckpointingMode): Unit
def enableCheckpointing(interval: Long, mode: CheckpointingMode, force: Boolean): Unit
def getCheckpointConfig: CheckpointConfig
def getCheckpointingMode: CheckpointingMode
}class StreamExecutionEnvironment {
def setStateBackend(backend: AbstractStateBackend): Unit
def getStateBackend: AbstractStateBackend
}class StreamExecutionEnvironment {
def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
def getRestartStrategy: RestartStrategyConfiguration
def setNumberOfExecutionRetries(numRetries: Int): Unit // Deprecated
def getNumberOfExecutionRetries: Int // Deprecated
}Usage Examples:
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.runtime.state.filesystem.FsStateBackend
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Enable checkpointing every 5 seconds with exactly-once mode
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
// Configure checkpoint settings
val checkpointConfig = env.getCheckpointConfig
checkpointConfig.setCheckpointTimeout(60000) // 1 minute timeout
checkpointConfig.setMaxConcurrentCheckpoints(1)
checkpointConfig.setMinPauseBetweenCheckpoints(500)
// Set filesystem state backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints"))
// Configure restart strategy
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000)) // 3 attempts, 10s delayclass StreamExecutionEnvironment {
def addDefaultKryoSerializer[T](type: Class[_], serializer: T): Unit
def addDefaultKryoSerializer(type: Class[_], serializerClass: Class[_]): Unit
def registerTypeWithKryoSerializer[T](clazz: Class[_], serializer: T): Unit
def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_]): Unit
def registerType(typeClass: Class[_]): Unit
}Usage Examples:
import com.esotericsoftware.kryo.Serializer
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Register custom type for better serialization performance
env.registerType(classOf[MyCustomClass])
// Register custom Kryo serializer
env.registerTypeWithKryoSerializer(classOf[MyClass], classOf[MyClassSerializer])class StreamExecutionEnvironment {
def registerCachedFile(filePath: String, name: String): Unit
def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit
def getCachedFiles: java.util.Set[java.util.Map.Entry[String, DistributedCache.DistributedCacheEntry]]
}Usage Examples:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Register a data file in distributed cache
env.registerCachedFile("/path/to/lookup-data.txt", "lookup-data")
// Register an executable file
env.registerCachedFile("/path/to/external-tool", "tool", executable = true)class StreamExecutionEnvironment {
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
def getExecutionPlan: String
def getStreamGraph: StreamGraph
}Usage Examples:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create your streaming program
val stream = env.fromElements(1, 2, 3, 4, 5)
.map(_ * 2)
.print()
// Execute the program
env.execute("My Streaming Job")
// Get execution plan as JSON (for debugging)
val executionPlan = env.getExecutionPlan
println(executionPlan)class StreamExecutionEnvironment {
def getConfig: ExecutionConfig
def getJavaEnv: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
}The ExecutionConfig provides access to additional configuration options:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val config = env.getConfig
// Configure global job parameters
config.setGlobalJobParameters(new MyJobParameters())
// Configure auto-generated UIDs
config.setAutoGeneratedUidsEnabled(false)
// Configure serialization
config.enableForceKryo()
config.enableGenericTypes()import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.windowing.time.Time
object FlinkStreamingApp {
def main(args: Array[String]): Unit = {
// Create execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Configure the environment
env.setParallelism(4)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))
// Create and process your data streams
val stream = env.socketTextStream("localhost", 9999)
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.timeWindow(Time.minutes(1))
.sum(1)
// Add sink
stream.print()
// Execute the program
env.execute("Word Count Streaming")
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-10