Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.
—
The StreamExecutionEnvironment is the main entry point for creating and configuring Flink streaming applications. It provides factory methods for creating environments, configuration options, and execution control.
Factory methods for creating different types of execution environments.
object StreamExecutionEnvironment {
/**
* Creates the execution environment based on the context (local vs cluster)
* @return StreamExecutionEnvironment instance
*/
def getExecutionEnvironment: StreamExecutionEnvironment
/**
* Creates a local execution environment with specified parallelism
* @param parallelism The parallelism for local execution
* @return Local StreamExecutionEnvironment
*/
def createLocalEnvironment(parallelism: Int = getDefaultLocalParallelism): StreamExecutionEnvironment
/**
* Creates a local environment with web UI for monitoring
* @param config Optional configuration
* @return Local environment with web UI
*/
def createLocalEnvironmentWithWebUI(config: Configuration = null): StreamExecutionEnvironment
/**
* Creates a remote execution environment
* @param host Remote JobManager host
* @param port Remote JobManager port
* @param jarFiles JAR files to submit with the job
* @return Remote StreamExecutionEnvironment
*/
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
/**
* Creates a remote execution environment with parallelism
* @param host Remote JobManager host
* @param port Remote JobManager port
* @param parallelism Parallelism for the job
* @param jarFiles JAR files to submit with the job
* @return Remote StreamExecutionEnvironment
*/
def createRemoteEnvironment(host: String, port: Int, parallelism: Int, jarFiles: String*): StreamExecutionEnvironment
}Usage Examples:
import org.apache.flink.streaming.api.scala._
// Automatic environment detection (recommended)
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Local environment for testing
val localEnv = StreamExecutionEnvironment.createLocalEnvironment(4)
// Remote environment for cluster submission
val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
"jobmanager-host", 8081, "my-application.jar"
)Control the parallelism settings for stream operations.
class StreamExecutionEnvironment {
/**
* Sets the default parallelism for all operators
* @param parallelism The parallelism degree
*/
def setParallelism(parallelism: Int): Unit
/**
* Sets the maximum parallelism for all operators
* @param maxParallelism The maximum parallelism degree
*/
def setMaxParallelism(maxParallelism: Int): Unit
/**
* Gets the default parallelism
* @return Current parallelism setting
*/
def getParallelism: Int
/**
* Gets the maximum parallelism
* @return Current maximum parallelism setting
*/
def getMaxParallelism: Int
}Configure runtime behavior and execution modes.
class StreamExecutionEnvironment {
/**
* Sets the runtime execution mode (batch vs streaming)
* @param executionMode The runtime execution mode
* @return This environment for chaining
*/
def setRuntimeMode(executionMode: RuntimeExecutionMode): StreamExecutionEnvironment
/**
* Sets the buffer timeout for network buffers
* @param timeoutMillis Timeout in milliseconds
* @return This environment for chaining
*/
def setBufferTimeout(timeoutMillis: Long): StreamExecutionEnvironment
/**
* Gets the buffer timeout
* @return Current buffer timeout in milliseconds
*/
def getBufferTimeout: Long
/**
* Disables operator chaining globally
* @return This environment for chaining
*/
def disableOperatorChaining(): StreamExecutionEnvironment
}Enable and configure checkpointing for fault tolerance.
class StreamExecutionEnvironment {
/**
* Enables checkpointing with specified interval
* @param interval Checkpoint interval in milliseconds
* @return This environment for chaining
*/
def enableCheckpointing(interval: Long): StreamExecutionEnvironment
/**
* Enables checkpointing with interval and mode
* @param interval Checkpoint interval in milliseconds
* @param mode Checkpointing mode (exactly-once or at-least-once)
* @return This environment for chaining
*/
def enableCheckpointing(interval: Long, mode: CheckpointingMode): StreamExecutionEnvironment
/**
* Gets the checkpoint configuration
* @return CheckpointConfig for fine-tuning
*/
def getCheckpointConfig: CheckpointConfig
/**
* Sets the state backend for checkpointing
* @param backend The state backend implementation
* @return This environment for chaining
*/
def setStateBackend(backend: StateBackend): StreamExecutionEnvironment
}Usage Examples:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.core.execution.CheckpointingMode
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Enable checkpointing every 5 seconds with exactly-once semantics
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
// Configure checkpoint settings
val checkpointConfig = env.getCheckpointConfig
checkpointConfig.setMinPauseBetweenCheckpoints(500)
checkpointConfig.setCheckpointTimeout(60000)Configure failure recovery behavior.
class StreamExecutionEnvironment {
/**
* Sets the restart strategy configuration
* @param restartStrategyConfiguration Restart strategy configuration
*/
def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
/**
* Gets the current restart strategy
* @return Current restart strategy configuration
*/
def getRestartStrategy: RestartStrategyConfiguration
}Create data sources for streaming applications.
class StreamExecutionEnvironment {
/**
* Creates a DataStream from a sequence of elements
* @param data Elements to include in the stream
* @return DataStream containing the elements
*/
def fromElements[T: TypeInformation](data: T*): DataStream[T]
/**
* Creates a DataStream from a collection
* @param data Collection to convert to stream
* @return DataStream containing collection elements
*/
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
/**
* Creates a DataStream from an iterator
* @param data Iterator to convert to stream
* @return DataStream containing iterator elements
*/
def fromCollection[T: TypeInformation](data: Iterator[T]): DataStream[T]
/**
* Creates a DataStream from a number sequence
* @param from Starting number (inclusive)
* @param to Ending number (inclusive)
* @return DataStream of Long numbers
*/
def fromSequence(from: Long, to: Long): DataStream[Long]
/**
* Reads a text file as a DataStream
* @param filePath Path to the text file
* @return DataStream of file lines
*/
def readTextFile(filePath: String): DataStream[String]
/**
* Creates a socket text stream
* @param hostname Host to connect to
* @param port Port to connect to
* @param delimiter Line delimiter character
* @param maxRetry Maximum retry attempts
* @return DataStream of socket text lines
*/
def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0): DataStream[String]
/**
* Adds a custom source function
* @param function Source function implementation
* @return DataStream from the source
*/
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]
/**
* Creates a DataStream from a new Source interface
* @param source Source implementation
* @param watermarkStrategy Watermark strategy for event time
* @param sourceName Name for the source operator
* @return DataStream from the source
*/
def fromSource[T: TypeInformation](
source: Source[T, _ <: SourceSplit, _],
watermarkStrategy: WatermarkStrategy[T],
sourceName: String
): DataStream[T]
}Execute streaming jobs and retrieve results.
class StreamExecutionEnvironment {
/**
* Executes the streaming job
* @return JobExecutionResult with execution details
*/
def execute(): JobExecutionResult
/**
* Executes the streaming job with a name
* @param jobName Name for the job
* @return JobExecutionResult with execution details
*/
def execute(jobName: String): JobExecutionResult
/**
* Executes the job asynchronously
* @return JobClient for monitoring the job
*/
def executeAsync(): JobClient
/**
* Executes the job asynchronously with a name
* @param jobName Name for the job
* @return JobClient for monitoring the job
*/
def executeAsync(jobName: String): JobClient
/**
* Gets the execution plan as a JSON string
* @return Execution plan representation
*/
def getExecutionPlan: String
/**
* Gets the StreamGraph representation
* @return StreamGraph for the defined transformations
*/
def getStreamGraph: StreamGraph
}Usage Examples:
import org.apache.flink.streaming.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Define your streaming pipeline
val stream = env.fromElements(1, 2, 3, 4, 5)
.map(_ * 2)
.print()
// Execute synchronously
val result = env.execute("My Streaming Job")
println(s"Job completed in ${result.getJobExecutionTime} ms")
// Or execute asynchronously
val jobClient = env.executeAsync("My Async Job")
// Monitor job status with jobClient// Runtime execution modes
sealed trait RuntimeExecutionMode
object RuntimeExecutionMode {
case object STREAMING extends RuntimeExecutionMode
case object BATCH extends RuntimeExecutionMode
case object AUTOMATIC extends RuntimeExecutionMode
}
// Checkpointing modes
sealed trait CheckpointingMode
object CheckpointingMode {
case object EXACTLY_ONCE extends CheckpointingMode
case object AT_LEAST_ONCE extends CheckpointingMode
}
// Job execution result
trait JobExecutionResult {
def getJobExecutionTime: Long
def getAccumulatorResult[V](accumulatorName: String): V
def getAllAccumulatorResults: java.util.Map[String, AnyRef]
}
// Job client for async execution
trait JobClient {
def getJobID: JobID
def getJobStatus: CompletableFuture[JobStatus]
def cancel(): CompletableFuture[Void]
def stopWithSavepoint(advanceToEndOfEventTime: Boolean, savepointDirectory: String): CompletableFuture[String]
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12