CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-streaming-scala-2-12

Scala API for Apache Flink streaming applications providing idiomatic Scala bindings for building high-throughput, low-latency stream processing applications.

Pending
Overview
Eval results
Files

data-streams.mddocs/

Data Sources and Streams

DataStream represents the core abstraction for processing streams of data in Flink. It provides a rich set of transformation operations while maintaining type safety through Scala's type system.

Capabilities

Stream Properties and Configuration

Access stream metadata and configure stream behavior.

class DataStream[T] {
  /**
   * Get the type information for stream elements
   * @return TypeInformation for type T
   */
  def dataType: TypeInformation[T]
  
  /**
   * Get the execution environment associated with this stream
   * @return StreamExecutionEnvironment instance
   */
  def executionEnvironment: StreamExecutionEnvironment
  
  /**
   * Get the current parallelism for this stream
   * @return Current parallelism degree
   */
  def parallelism: Int
  
  /**
   * Set the parallelism for this operation
   * @param parallelism Parallelism degree
   * @return New DataStream with specified parallelism
   */
  def setParallelism(parallelism: Int): DataStream[T]
  
  /**
   * Set the maximum parallelism for this operation
   * @param maxParallelism Maximum parallelism degree
   * @return New DataStream with specified max parallelism
   */
  def setMaxParallelism(maxParallelism: Int): DataStream[T]
  
  /**
   * Set a name for this operation
   * @param name Operator name
   * @return New DataStream with specified name
   */
  def name(name: String): DataStream[T]
  
  /**
   * Set a unique identifier for this operation
   * @param uid Unique identifier
   * @return New DataStream with specified UID
   */
  def uid(uid: String): DataStream[T]
}

Basic Transformations

Core transformation operations for modifying stream elements.

class DataStream[T] {
  /**
   * Apply a function to each element in the stream
   * @param fun Mapping function from T to R
   * @return DataStream of mapped elements
   */
  def map[R: TypeInformation](fun: T => R): DataStream[R]
  
  /**
   * Apply a MapFunction to each element
   * @param mapper MapFunction implementation
   * @return DataStream of mapped elements  
   */
  def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R]
  
  /**
   * Apply a function that returns multiple elements for each input
   * @param fun Function returning TraversableOnce of R
   * @return DataStream of flattened results
   */
  def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
  
  /**
   * Apply a FlatMapFunction that outputs to a Collector
   * @param fun Function that outputs to Collector
   * @return DataStream of collected results
   */
  def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R]
  
  /**
   * Filter elements based on a predicate
   * @param fun Predicate function returning Boolean
   * @return DataStream of filtered elements
   */
  def filter(fun: T => Boolean): DataStream[T]
  
  /**
   * Filter elements using a FilterFunction
   * @param filter FilterFunction implementation
   * @return DataStream of filtered elements
   */
  def filter(filter: FilterFunction[T]): DataStream[T]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment
val numbers = env.fromElements(1, 2, 3, 4, 5)

// Map transformation
val doubled = numbers.map(_ * 2)

// FlatMap transformation
val words = env.fromElements("hello world", "scala flink")
  .flatMap(_.split(" "))

// Filter transformation  
val evenNumbers = numbers.filter(_ % 2 == 0)

// Chaining transformations
val result = numbers
  .filter(_ > 2)
  .map(_ * 3)
  .filter(_ < 15)

Stream Partitioning

Control how stream elements are distributed across parallel instances.

class DataStream[T] {
  /**
   * Partition by key using a key selector function
   * @param fun Key selector function
   * @return KeyedStream partitioned by the key
   */
  def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
  
  /**
   * Partition by key using a KeySelector
   * @param fun KeySelector implementation
   * @return KeyedStream partitioned by the key
   */
  def keyBy[K: TypeInformation](fun: KeySelector[T, K]): KeyedStream[T, K]
  
  /**
   * Custom partitioning using a Partitioner
   * @param partitioner Custom partitioner implementation
   * @param fun Key selector for partitioning
   * @return DataStream with custom partitioning
   */
  def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K): DataStream[T]
  
  /**
   * Broadcast all elements to all downstream operators
   * @return DataStream with broadcast partitioning
   */
  def broadcast: DataStream[T]
  
  /**
   * Round-robin distribution across parallel instances
   * @return DataStream with rebalanced partitioning
   */
  def rebalance: DataStream[T]
  
  /**
   * Local round-robin within the same TaskManager
   * @return DataStream with rescaled partitioning
   */
  def rescale: DataStream[T]
  
  /**
   * Random distribution across parallel instances
   * @return DataStream with shuffle partitioning
   */
  def shuffle: DataStream[T]
  
  /**
   * Forward elements to next operator (no redistribution)
   * @return DataStream with forward partitioning
   */
  def forward: DataStream[T]
  
  /**
   * Send all elements to the first parallel instance
   * @return DataStream with global partitioning
   */
  def global: DataStream[T]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._

case class User(id: Int, name: String, department: String)

val users = env.fromElements(
  User(1, "Alice", "Engineering"),
  User(2, "Bob", "Sales"),
  User(3, "Charlie", "Engineering")
)

// Key by user department
val usersByDept = users.keyBy(_.department)

// Key by user ID
val usersById = users.keyBy(_.id)

// Rebalance for load distribution
val balanced = users.rebalance

// Broadcast to all downstream operators
val broadcast = users.broadcast

Stream Union and Connections

Combine multiple streams into unified processing pipelines.

class DataStream[T] {
  /**
   * Union this stream with other streams of the same type
   * @param dataStreams Other streams to union with
   * @return DataStream containing elements from all input streams
   */
  def union(dataStreams: DataStream[T]*): DataStream[T]
  
  /**
   * Connect this stream with another stream of different type
   * @param dataStream Stream to connect with
   * @return ConnectedStreams for co-processing
   */
  def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
  
  /**
   * Connect with a broadcast stream for broadcast state
   * @param broadcastStream Broadcast stream to connect with
   * @return BroadcastConnectedStream for broadcast processing
   */
  def connect[R](broadcastStream: BroadcastStream[R]): BroadcastConnectedStream[T, R]
}

Windowing (All-Window Operations)

Apply windowing operations on non-keyed streams.

class DataStream[T] {
  /**
   * Apply time-based tumbling windowing to all elements (deprecated)
   * @param size Window size
   * @return AllWindowedStream for aggregations
   */
  @deprecated("Use windowAll(TumblingEventTimeWindows.of(size))", "1.12.0")
  def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]
  
  /**
   * Apply time-based sliding windowing to all elements (deprecated)
   * @param size Window size
   * @param slide Slide interval
   * @return AllWindowedStream for aggregations
   */
  @deprecated("Use windowAll(SlidingEventTimeWindows.of(size, slide))", "1.12.0")
  def timeWindowAll(size: Time, slide: Time): AllWindowedStream[T, TimeWindow]
  
  /**
   * Apply count-based windowing to all elements
   * @param size Window size (number of elements)
   * @return AllWindowedStream for aggregations
   */
  def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]
  
  /**
   * Apply sliding count-based windowing to all elements
   * @param size Window size (number of elements)  
   * @param slide Slide size (number of elements)
   * @return AllWindowedStream for aggregations
   */
  def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow]
  
  /**
   * Apply custom windowing to all elements
   * @param assigner Window assigner implementation
   * @return AllWindowedStream for aggregations
   */
  def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W]
}

Time and Watermarks

Configure event time processing and watermark generation.

class DataStream[T] {
  /**
   * Assign timestamps and watermarks using a WatermarkStrategy
   * @param watermarkStrategy Strategy for timestamp and watermark assignment
   * @return DataStream with assigned timestamps
   */
  def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): DataStream[T]
  
  /**
   * Assign ascending timestamps (deprecated)
   * @param extractor Function to extract timestamps
   * @return DataStream with assigned timestamps
   */
  def assignAscendingTimestamps(extractor: T => Long): DataStream[T]
}

Usage Examples:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import java.time.Duration

case class Event(id: String, timestamp: Long, value: Double)

val events = env.fromElements(
  Event("A", 1000L, 1.0),
  Event("B", 2000L, 2.0),
  Event("C", 3000L, 3.0)
)

// Assign watermarks for event time processing
val eventsWithWatermarks = events
  .assignTimestampsAndWatermarks(
    WatermarkStrategy
      .forBoundedOutOfOrderness[Event](Duration.ofSeconds(5))
      .withTimestampAssigner(new SerializableTimestampAssigner[Event] {
        override def extractTimestamp(element: Event, recordTimestamp: Long): Long = 
          element.timestamp
      })
  )

Iterations

Create iterative processing patterns for complex algorithms.

class DataStream[T] {
  /**
   * Create an iteration with feedback loop
   * @param stepFunction Function defining iteration step
   * @param maxWaitTimeMillis Maximum wait time for iteration
   * @return DataStream with iteration results
   */
  def iterate[R](
    stepFunction: DataStream[T] => (DataStream[T], DataStream[R]), 
    maxWaitTimeMillis: Long = 0
  ): DataStream[R]
  
  /**
   * Create an iteration with connected streams
   * @param stepFunction Function with connected streams step
   * @param maxWaitTimeMillis Maximum wait time for iteration
   * @return DataStream with iteration results
   */
  def iterate[R, F: TypeInformation](
    stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]), 
    maxWaitTimeMillis: Long
  ): DataStream[R]
}

Processing Functions

Apply custom processing logic with access to runtime context.

class DataStream[T] {
  /**
   * Apply a ProcessFunction for low-level processing
   * @param processFunction ProcessFunction implementation
   * @return DataStream with processed results
   */
  def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]
}

Side Outputs

Extract additional output streams from processing functions.

class DataStream[T] {
  /**
   * Get a side output stream by tag
   * @param tag OutputTag identifying the side output
   * @return DataStream of side output elements
   */
  def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X]
}

Advanced Operations

Low-level operations for custom stream processing.

class DataStream[T] {
  /**
   * Apply a custom stream operator
   * @param operatorName Name for the operator
   * @param operator Custom operator implementation
   * @return DataStream with custom transformation
   */
  def transform[R: TypeInformation](
    operatorName: String, 
    operator: OneInputStreamOperator[T, R]
  ): DataStream[R]
  
  /**
   * Cache this stream for reuse in multiple downstream operations
   * @return CachedDataStream for reuse
   */
  def cache(): CachedDataStream[T]
}

Types

// Core function interfaces
trait MapFunction[T, R] {
  def map(value: T): R
}

trait FlatMapFunction[T, R] {
  def flatMap(value: T, out: Collector[R]): Unit
}

trait FilterFunction[T] {
  def filter(value: T): Boolean
}

// Key selector interface
trait KeySelector[T, K] {
  def getKey(value: T): K
}

// Partitioner interface
trait Partitioner[K] {
  def partition(key: K, numPartitions: Int): Int
}

// Collector interface for output
trait Collector[T] {
  def collect(record: T): Unit
  def close(): Unit
}

// Output tag for side outputs
case class OutputTag[T: TypeInformation](id: String) {
  def getTypeInfo: TypeInformation[T]
}

// Cached data stream
class CachedDataStream[T](dataStream: DataStream[T]) extends DataStream[T] {
  def invalidateCache(): Unit
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-streaming-scala-2-12

docs

async-io.md

data-streams.md

execution-environment.md

index.md

keyed-streams.md

processing-functions.md

sinks-output.md

stream-connections.md

window-functions.md

windowing.md

tile.json