The StreamExecutionEnvironment is the main entry point for all Flink streaming programs. It provides the context for creating data streams and configuring the runtime behavior of streaming applications.
object StreamExecutionEnvironment {
def getExecutionEnvironment: StreamExecutionEnvironment
def createLocalEnvironment(): StreamExecutionEnvironment
def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment
def createLocalEnvironmentWithWebUI(config: Configuration): StreamExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
def setDefaultLocalParallelism(parallelism: Int): Unit
def getDefaultLocalParallelism: Int
}Usage Examples:
import org.apache.flink.streaming.api.scala._
// Get context-appropriate environment (local or cluster)
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create local environment with specific parallelism
val localEnv = StreamExecutionEnvironment.createLocalEnvironment(4)
// Create remote cluster environment
val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
"localhost", 8081, "/path/to/your-program.jar"
)class StreamExecutionEnvironment {
def setParallelism(parallelism: Int): Unit
def getParallelism: Int
def setMaxParallelism(maxParallelism: Int): Unit
def getMaxParallelism: Int
}class StreamExecutionEnvironment {
def setBufferTimeout(timeoutMillis: Long): Unit
def getBufferTimeout: Long
def disableOperatorChaining(): Unit
}class StreamExecutionEnvironment {
def setStreamTimeCharacteristic(characteristic: TimeCharacteristic): Unit
def getStreamTimeCharacteristic: TimeCharacteristic
}Usage Examples:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Set parallelism for all operators
env.setParallelism(4)
// Set buffer timeout for low-latency processing
env.setBufferTimeout(0) // Flush after every record
// Configure time characteristic for event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)class StreamExecutionEnvironment {
def enableCheckpointing(interval: Long): Unit
def enableCheckpointing(interval: Long, mode: CheckpointingMode): Unit
def enableCheckpointing(interval: Long, mode: CheckpointingMode, force: Boolean): Unit
def getCheckpointConfig: CheckpointConfig
def getCheckpointingMode: CheckpointingMode
}class StreamExecutionEnvironment {
def setStateBackend(backend: AbstractStateBackend): Unit
def getStateBackend: AbstractStateBackend
}class StreamExecutionEnvironment {
def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
def getRestartStrategy: RestartStrategyConfiguration
def setNumberOfExecutionRetries(numRetries: Int): Unit // Deprecated
def getNumberOfExecutionRetries: Int // Deprecated
}Usage Examples:
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.runtime.state.filesystem.FsStateBackend
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Enable checkpointing every 5 seconds with exactly-once mode
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
// Configure checkpoint settings
val checkpointConfig = env.getCheckpointConfig
checkpointConfig.setCheckpointTimeout(60000) // 1 minute timeout
checkpointConfig.setMaxConcurrentCheckpoints(1)
checkpointConfig.setMinPauseBetweenCheckpoints(500)
// Set filesystem state backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints"))
// Configure restart strategy
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000)) // 3 attempts, 10s delayclass StreamExecutionEnvironment {
def addDefaultKryoSerializer[T](type: Class[_], serializer: T): Unit
def addDefaultKryoSerializer(type: Class[_], serializerClass: Class[_]): Unit
def registerTypeWithKryoSerializer[T](clazz: Class[_], serializer: T): Unit
def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_]): Unit
def registerType(typeClass: Class[_]): Unit
}Usage Examples:
import com.esotericsoftware.kryo.Serializer
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Register custom type for better serialization performance
env.registerType(classOf[MyCustomClass])
// Register custom Kryo serializer
env.registerTypeWithKryoSerializer(classOf[MyClass], classOf[MyClassSerializer])class StreamExecutionEnvironment {
def registerCachedFile(filePath: String, name: String): Unit
def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit
def getCachedFiles: java.util.Set[java.util.Map.Entry[String, DistributedCache.DistributedCacheEntry]]
}Usage Examples:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Register a data file in distributed cache
env.registerCachedFile("/path/to/lookup-data.txt", "lookup-data")
// Register an executable file
env.registerCachedFile("/path/to/external-tool", "tool", executable = true)class StreamExecutionEnvironment {
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
def getExecutionPlan: String
def getStreamGraph: StreamGraph
}Usage Examples:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Create your streaming program
val stream = env.fromElements(1, 2, 3, 4, 5)
.map(_ * 2)
.print()
// Execute the program
env.execute("My Streaming Job")
// Get execution plan as JSON (for debugging)
val executionPlan = env.getExecutionPlan
println(executionPlan)class StreamExecutionEnvironment {
def getConfig: ExecutionConfig
def getJavaEnv: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
}The ExecutionConfig provides access to additional configuration options:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val config = env.getConfig
// Configure global job parameters
config.setGlobalJobParameters(new MyJobParameters())
// Configure auto-generated UIDs
config.setAutoGeneratedUidsEnabled(false)
// Configure serialization
config.enableForceKryo()
config.enableGenericTypes()import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.windowing.time.Time
object FlinkStreamingApp {
def main(args: Array[String]): Unit = {
// Create execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Configure the environment
env.setParallelism(4)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))
// Create and process your data streams
val stream = env.socketTextStream("localhost", 9999)
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.timeWindow(Time.minutes(1))
.sum(1)
// Add sink
stream.print()
// Execute the program
env.execute("Word Count Streaming")
}
}