or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-operations.mddata-sources.mdfunction-interfaces.mdindex.mdkeyed-streams.mdoutput-operations.mdscala-extensions.mdstream-composition.mdstream-environment.mdstream-partitioning.mdstream-transformations.mdwindow-operations.mdwindowing.md
tile.json

stream-environment.mddocs/

Stream Environment and Execution

The StreamExecutionEnvironment is the main entry point for all Flink streaming programs. It provides the context for creating data streams and configuring the runtime behavior of streaming applications.

Creating Execution Environments

Factory Methods

object StreamExecutionEnvironment {
  def getExecutionEnvironment: StreamExecutionEnvironment
  def createLocalEnvironment(): StreamExecutionEnvironment
  def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment
  def createLocalEnvironmentWithWebUI(config: Configuration): StreamExecutionEnvironment
  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
  def setDefaultLocalParallelism(parallelism: Int): Unit
  def getDefaultLocalParallelism: Int
}

Usage Examples:

import org.apache.flink.streaming.api.scala._

// Get context-appropriate environment (local or cluster)
val env = StreamExecutionEnvironment.getExecutionEnvironment

// Create local environment with specific parallelism
val localEnv = StreamExecutionEnvironment.createLocalEnvironment(4)

// Create remote cluster environment
val remoteEnv = StreamExecutionEnvironment.createRemoteEnvironment(
  "localhost", 8081, "/path/to/your-program.jar"
)

Configuration Methods

Parallelism Configuration

class StreamExecutionEnvironment {
  def setParallelism(parallelism: Int): Unit
  def getParallelism: Int
  def setMaxParallelism(maxParallelism: Int): Unit
  def getMaxParallelism: Int
}

Buffer and Latency Configuration

class StreamExecutionEnvironment {
  def setBufferTimeout(timeoutMillis: Long): Unit
  def getBufferTimeout: Long
  def disableOperatorChaining(): Unit
}

Time Characteristics

class StreamExecutionEnvironment {
  def setStreamTimeCharacteristic(characteristic: TimeCharacteristic): Unit
  def getStreamTimeCharacteristic: TimeCharacteristic
}

Usage Examples:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Set parallelism for all operators
env.setParallelism(4)

// Set buffer timeout for low-latency processing
env.setBufferTimeout(0)  // Flush after every record

// Configure time characteristic for event time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

Checkpointing and Fault Tolerance

Checkpointing Configuration

class StreamExecutionEnvironment {
  def enableCheckpointing(interval: Long): Unit
  def enableCheckpointing(interval: Long, mode: CheckpointingMode): Unit
  def enableCheckpointing(interval: Long, mode: CheckpointingMode, force: Boolean): Unit
  def getCheckpointConfig: CheckpointConfig
  def getCheckpointingMode: CheckpointingMode
}

State Backend Configuration

class StreamExecutionEnvironment {
  def setStateBackend(backend: AbstractStateBackend): Unit
  def getStateBackend: AbstractStateBackend
}

Restart Strategy Configuration

class StreamExecutionEnvironment {
  def setRestartStrategy(restartStrategyConfiguration: RestartStrategyConfiguration): Unit
  def getRestartStrategy: RestartStrategyConfiguration
  def setNumberOfExecutionRetries(numRetries: Int): Unit  // Deprecated
  def getNumberOfExecutionRetries: Int  // Deprecated
}

Usage Examples:

import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.runtime.state.filesystem.FsStateBackend

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Enable checkpointing every 5 seconds with exactly-once mode
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)

// Configure checkpoint settings
val checkpointConfig = env.getCheckpointConfig
checkpointConfig.setCheckpointTimeout(60000)  // 1 minute timeout
checkpointConfig.setMaxConcurrentCheckpoints(1)
checkpointConfig.setMinPauseBetweenCheckpoints(500)

// Set filesystem state backend
env.setStateBackend(new FsStateBackend("hdfs://namenode:port/flink-checkpoints"))

// Configure restart strategy
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))  // 3 attempts, 10s delay

Type System Configuration

Kryo Serialization

class StreamExecutionEnvironment {
  def addDefaultKryoSerializer[T](type: Class[_], serializer: T): Unit
  def addDefaultKryoSerializer(type: Class[_], serializerClass: Class[_]): Unit
  def registerTypeWithKryoSerializer[T](clazz: Class[_], serializer: T): Unit
  def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_]): Unit
  def registerType(typeClass: Class[_]): Unit
}

Usage Examples:

import com.esotericsoftware.kryo.Serializer

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Register custom type for better serialization performance
env.registerType(classOf[MyCustomClass])

// Register custom Kryo serializer
env.registerTypeWithKryoSerializer(classOf[MyClass], classOf[MyClassSerializer])

Distributed Cache

Cache File Registration

class StreamExecutionEnvironment {
  def registerCachedFile(filePath: String, name: String): Unit
  def registerCachedFile(filePath: String, name: String, executable: Boolean): Unit
  def getCachedFiles: java.util.Set[java.util.Map.Entry[String, DistributedCache.DistributedCacheEntry]]
}

Usage Examples:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Register a data file in distributed cache
env.registerCachedFile("/path/to/lookup-data.txt", "lookup-data")

// Register an executable file
env.registerCachedFile("/path/to/external-tool", "tool", executable = true)

Program Execution

Execution Methods

class StreamExecutionEnvironment {
  def execute(): JobExecutionResult
  def execute(jobName: String): JobExecutionResult
  def getExecutionPlan: String
  def getStreamGraph: StreamGraph
}

Usage Examples:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// Create your streaming program
val stream = env.fromElements(1, 2, 3, 4, 5)
  .map(_ * 2)
  .print()

// Execute the program
env.execute("My Streaming Job")

// Get execution plan as JSON (for debugging)
val executionPlan = env.getExecutionPlan
println(executionPlan)

Configuration Access

Environment Configuration

class StreamExecutionEnvironment {
  def getConfig: ExecutionConfig
  def getJavaEnv: org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
}

The ExecutionConfig provides access to additional configuration options:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val config = env.getConfig

// Configure global job parameters
config.setGlobalJobParameters(new MyJobParameters())

// Configure auto-generated UIDs
config.setAutoGeneratedUidsEnabled(false)

// Configure serialization
config.enableForceKryo()
config.enableGenericTypes()

Complete Example

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.streaming.api.windowing.time.Time

object FlinkStreamingApp {
  def main(args: Array[String]): Unit = {
    // Create execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    // Configure the environment
    env.setParallelism(4)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000))
    
    // Create and process your data streams
    val stream = env.socketTextStream("localhost", 9999)
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map((_, 1))
      .keyBy(0)
      .timeWindow(Time.minutes(1))
      .sum(1)
    
    // Add sink
    stream.print()
    
    // Execute the program
    env.execute("Word Count Streaming")
  }
}