The StreamExecutionEnvironment is the main entry point for creating Flink streaming applications. It provides methods for creating data sources, configuring execution parameters, and managing the job lifecycle.
Factory methods for creating different types of execution environments.
/**
* Creates a local execution environment with default parallelism
*/
object StreamExecutionEnvironment {
def getExecutionEnvironment: StreamExecutionEnvironment
def createLocalEnvironment(): StreamExecutionEnvironment
def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
}Usage Examples:
import org.apache.flink.streaming.api.scala._
// Get execution environment (local or cluster depending on context)
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Force local execution
val localEnv = StreamExecutionEnvironment.createLocalEnvironment()
// Remote cluster execution
val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "path/to/job.jar")Methods for configuring job execution parameters.
/**
* Configure parallelism and execution mode
*/
class StreamExecutionEnvironment {
def setParallelism(parallelism: Int): Unit
def getParallelism: Int
def setMaxParallelism(maxParallelism: Int): Unit
def getMaxParallelism: Int
def setRuntimeMode(executionMode: RuntimeExecutionMode): StreamExecutionEnvironment
def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment
def getBufferTimeout: Long
def disableOperatorChaining(): StreamExecutionEnvironment
}Enable and configure checkpointing for fault tolerance.
/**
* Configure checkpointing for fault tolerance
*/
class StreamExecutionEnvironment {
def enableCheckpointing(interval: Long): StreamExecutionEnvironment
def enableCheckpointing(interval: Long, mode: CheckpointingMode): StreamExecutionEnvironment
def getCheckpointConfig: CheckpointConfig
}Usage Examples:
// Enable checkpointing every 5 seconds
env.enableCheckpointing(5000)
// Configure checkpoint mode
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
// Advanced checkpoint configuration
val checkpointConfig = env.getCheckpointConfig
checkpointConfig.setMinPauseBetweenCheckpoints(500)
checkpointConfig.setCheckpointTimeout(60000)Methods for creating data streams from various sources.
/**
* Create data streams from various sources
*/
class StreamExecutionEnvironment {
def fromCollection[T](data: Seq[T]): DataStream[T]
def fromElements[T](data: T*): DataStream[T]
def fromParallelCollection[T](data: SplittableIterator[T]): DataStream[T]
def generateSequence(from: Long, to: Long): DataStream[Long]
def readTextFile(filePath: String): DataStream[String]
def readFile[T](inputFormat: FileInputFormat[T], filePath: String): DataStream[T]
def socketTextStream(hostname: String, port: Int): DataStream[String]
def addSource[T](function: SourceFunction[T]): DataStream[T]
def fromSource[T](source: Source[T, _, _], watermarkStrategy: WatermarkStrategy[T], sourceName: String): DataStream[T]
}Usage Examples:
// Create from collection
val dataStream = env.fromCollection(List(1, 2, 3, 4, 5))
// Create from elements
val elementsStream = env.fromElements("hello", "world", "flink")
// Read from file
val fileStream = env.readTextFile("path/to/input.txt")
// Socket stream for testing
val socketStream = env.socketTextStream("localhost", 9999)
// Custom source function
val customStream = env.addSource(new MyCustomSourceFunction())Methods for executing the streaming job.
/**
* Execute the streaming job
*/
class StreamExecutionEnvironment {
def execute(): JobExecutionResult
def execute(jobName: String): JobExecutionResult
def executeAsync(): JobClient
def executeAsync(jobName: String): JobClient
def getStreamGraph: StreamGraph
def getStreamGraph(jobName: String): StreamGraph
}Usage Examples:
// Execute with default job name
env.execute()
// Execute with custom job name
env.execute("My Streaming Job")
// Async execution for non-blocking operation
val jobClient = env.executeAsync("Async Job")Configure state backends for stateful operations.
/**
* Configure state backend
*/
class StreamExecutionEnvironment {
def setStateBackend(backend: StateBackend): StreamExecutionEnvironment
def getStateBackend: StateBackend
}Access to execution configuration and cached files.
/**
* Access configuration and resources
*/
class StreamExecutionEnvironment {
def getConfig: ExecutionConfig
def getCachedFiles: Map[String, URI]
def registerCachedFile(filePath: String, name: String): Unit
def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit
}// Main environment class
class StreamExecutionEnvironment(javaEnv: JavaEnv)
// Execution modes and configuration
enum RuntimeExecutionMode {
STREAMING, BATCH
}
enum CheckpointingMode {
EXACTLY_ONCE, AT_LEAST_ONCE
}
// Job execution results
trait JobExecutionResult
trait JobClient
// Configuration classes
class ExecutionConfig
class CheckpointConfig
class StreamGraph