or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-operations.mddata-sources.mdfunction-interfaces.mdindex.mdkeyed-streams.mdoutput-operations.mdscala-extensions.mdstream-composition.mdstream-environment.mdstream-partitioning.mdstream-transformations.mdwindow-operations.mdwindowing.md
tile.json

tessl/maven-org-apache-flink--flink-streaming-scala_2-10

Apache Flink Scala API for DataStream processing with type-safe, functional programming constructs for building streaming data processing applications.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-streaming-scala_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-streaming-scala_2-10@1.3.0

index.mddocs/

Apache Flink Scala API for DataStream Processing

Apache Flink Scala API provides a type-safe, functional programming interface for building streaming data processing applications. It wraps Flink's Java DataStream API with Scala-friendly constructs, enabling developers to use functional programming patterns, type safety, and expressive syntax for real-time stream processing with exactly-once processing guarantees.

Package Information

  • Package Name: flink-streaming-scala_2.10
  • Package Type: Maven
  • Language: Scala 2.10
  • Installation: org.apache.flink:flink-streaming-scala_2.10:1.3.3

Core Imports

import org.apache.flink.streaming.api.scala._

For specific functionality:

import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, KeyedStream}
import org.apache.flink.streaming.api.scala.function.{WindowFunction, ProcessWindowFunction}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow

Basic Usage

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

// Set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// Create a data stream from a collection
val stream = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Transform the data
val result = stream
  .filter(_ % 2 == 0)  // Filter even numbers
  .map(_ * 2)          // Double each number
  .keyBy(identity)     // Key by value
  .timeWindow(Time.seconds(5))  // 5-second tumbling windows
  .sum(0)              // Sum values in each window

// Add a sink to print results
result.print()

// Execute the streaming program
env.execute("Basic Flink Scala Example")

Architecture

Flink Scala API is built around several key architectural components:

  • StreamExecutionEnvironment: Entry point for creating and configuring streaming applications
  • DataStream: Core abstraction representing unbounded streams of elements with type safety
  • KeyedStream: Partitioned streams enabling stateful operations and keyed transformations
  • WindowedStream: Time or count-based partitioned streams for aggregations over bounded intervals
  • Function Interfaces: Type-safe interfaces for user-defined operations (transformations, aggregations, windows)
  • Type System Integration: Automatic TypeInformation generation via Scala macros for serialization
  • State Management: Managed state with exactly-once consistency guarantees and fault tolerance
  • Time Processing: Support for event time, processing time, and ingestion time with watermark handling

Capabilities

Stream Environment and Execution

Core functionality for creating and configuring Flink streaming applications, including environment setup, parallelism control, checkpointing, and program execution.

object StreamExecutionEnvironment {
  def getExecutionEnvironment: StreamExecutionEnvironment
  def createLocalEnvironment(): StreamExecutionEnvironment
  def createLocalEnvironment(parallelism: Int): StreamExecutionEnvironment
  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment
}

class StreamExecutionEnvironment {
  def setParallelism(parallelism: Int): Unit
  def getParallelism: Int
  def enableCheckpointing(interval: Long): Unit
  def execute(): JobExecutionResult
  def execute(jobName: String): JobExecutionResult
}

Stream Environment and Execution

Data Sources and Stream Creation

Comprehensive functionality for creating DataStreams from various sources including collections, files, sockets, and custom source functions.

class StreamExecutionEnvironment {
  def fromElements[T: TypeInformation](data: T*): DataStream[T]
  def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T]
  def readTextFile(filePath: String): DataStream[String]
  def socketTextStream(hostname: String, port: Int): DataStream[String]
  def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T]
  def generateSequence(from: Long, to: Long): DataStream[Long]
}

Data Sources and Stream Creation

Stream Transformations and Operations

Core stream processing operations including element-wise transformations, filtering, and stateful processing with type-safe functional interfaces.

class DataStream[T] {
  def map[R: TypeInformation](fun: T => R): DataStream[R]
  def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R]
  def filter(fun: T => Boolean): DataStream[T]
  def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]
}

Stream Transformations and Operations

Stream Partitioning and Distribution

Functionality for controlling data distribution across parallel operators including key-based partitioning, broadcasting, and custom partitioning strategies.

class DataStream[T] {
  def keyBy[K: TypeInformation](fun: T => K): KeyedStream[T, K]
  def keyBy(fields: Int*): KeyedStream[T, _]
  def keyBy(firstField: String, otherFields: String*): KeyedStream[T, _]
  def broadcast: DataStream[T]
  def shuffle: DataStream[T]
  def rebalance: DataStream[T]
}

Stream Partitioning and Distribution

Keyed Streams and Stateful Processing

Advanced operations on partitioned streams including stateful transformations, aggregations, and state management with exactly-once consistency.

class KeyedStream[T, K] {
  def reduce(fun: (T, T) => T): DataStream[T]
  def process[R: TypeInformation](processFunction: ProcessFunction[T, R]): DataStream[R]
  def mapWithState[R: TypeInformation, S: TypeInformation](fun: (T, Option[S]) => (R, Option[S])): DataStream[R]
  def sum(position: Int): DataStream[T]
  def min(position: Int): DataStream[T]
  def max(position: Int): DataStream[T]
}

Keyed Streams and Stateful Processing

Windowing and Time-Based Processing

Comprehensive windowing functionality for both keyed and non-keyed streams, including time-based and count-based windows with custom triggers and evictors.

class KeyedStream[T, K] {
  def timeWindow(size: Time): WindowedStream[T, K, TimeWindow]
  def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow]
  def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]
  def window[W <: Window](assigner: WindowAssigner[_ >: T, W]): WindowedStream[T, K, W]
}

class DataStream[T] {
  def timeWindowAll(size: Time): AllWindowedStream[T, TimeWindow]
  def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow]
}

Windowing and Time-Based Processing

Window Operations and Aggregations

Operations that can be applied to windowed streams including built-in aggregations, custom window functions, and incremental aggregations.

class WindowedStream[T, K, W <: Window] {
  def reduce(function: (T, T) => T): DataStream[T]
  def apply[R: TypeInformation](function: WindowFunction[T, R, K, W]): DataStream[R]
  def process[R: TypeInformation](function: ProcessWindowFunction[T, R, K, W]): DataStream[R]
  def sum(position: Int): DataStream[T]
  def min(position: Int): DataStream[T]
  def max(position: Int): DataStream[T]
}

Window Operations and Aggregations

Stream Composition and Joining

Advanced stream composition operations including union, connect, join, and co-group operations for processing multiple streams together.

class DataStream[T] {
  def union(dataStreams: DataStream[T]*): DataStream[T]
  def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2]
  def join[T2](otherStream: DataStream[T2]): JoinedStreams[T, T2]
  def coGroup[T2](otherStream: DataStream[T2]): CoGroupedStreams[T, T2]
}

class ConnectedStreams[IN1, IN2] {
  def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R): DataStream[R]
  def process[R: TypeInformation](coProcessFunction: CoProcessFunction[IN1, IN2, R]): DataStream[R]
}

Stream Composition and Joining

Asynchronous I/O Operations

High-performance asynchronous I/O operations for external system integration with configurable parallelism, timeouts, and result ordering.

object AsyncDataStream {
  def unorderedWait[IN, OUT: TypeInformation](
    input: DataStream[IN],
    asyncFunction: AsyncFunction[IN, OUT],
    timeout: Long,
    timeUnit: TimeUnit
  ): DataStream[OUT]
  
  def orderedWait[IN, OUT: TypeInformation](
    input: DataStream[IN],
    asyncFunction: AsyncFunction[IN, OUT],
    timeout: Long,
    timeUnit: TimeUnit
  ): DataStream[OUT]
}

trait AsyncFunction[IN, OUT] {
  def asyncInvoke(input: IN, collector: AsyncCollector[OUT]): Unit
}

Asynchronous I/O Operations

Output Operations and Sinks

Comprehensive output functionality for writing stream results to various destinations including files, databases, message queues, and custom sinks.

class DataStream[T] {
  def print(): DataStreamSink[T]
  def writeAsText(path: String): DataStreamSink[T]
  def writeAsCsv(path: String): DataStreamSink[T]
  def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T]
  def addSink(fun: T => Unit): DataStreamSink[T]
}

Output Operations and Sinks

Function Interfaces and User-Defined Functions

Type-safe interfaces for implementing custom processing logic including window functions, process functions, and rich functions with lifecycle management.

trait WindowFunction[IN, OUT, KEY, W <: Window] {
  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]): Unit
}

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] {
  def process(key: KEY, context: Context, elements: Iterable[IN], out: Collector[OUT]): Unit
  def clear(context: Context): Unit
}

trait AsyncFunction[IN, OUT] {
  def asyncInvoke(input: IN, collector: AsyncCollector[OUT]): Unit
}

Function Interfaces and User-Defined Functions

Scala Extensions and Partial Functions

Scala-specific extensions that enable partial function support for more idiomatic Scala programming with automatic conversion between partial and total functions.

import org.apache.flink.streaming.api.scala.extensions._

class OnDataStream[T] {
  def mapWith[R: TypeInformation](fun: PartialFunction[T, R]): DataStream[R]
  def filterWith(fun: PartialFunction[T, Boolean]): DataStream[T]
  def flatMapWith[R: TypeInformation](fun: PartialFunction[T, TraversableOnce[R]]): DataStream[R]
}

Scala Extensions and Partial Functions

Types

Core Stream Types

class DataStream[T]
class KeyedStream[T, K] extends DataStream[T]
class ConnectedStreams[IN1, IN2]
class WindowedStream[T, K, W <: Window]
class AllWindowedStream[T, W <: Window]
class SplitStream[T] extends DataStream[T]

Environment and Configuration Types

class StreamExecutionEnvironment
class ExecutionConfig
class CheckpointConfig
class RestartStrategies

Function Interface Types

trait WindowFunction[IN, OUT, KEY, W <: Window]
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
abstract class RichWindowFunction[IN, OUT, KEY, W <: Window]
trait AsyncFunction[IN, OUT]
trait AsyncCollector[OUT]

Builder Types

class JoinedStreams[T1, T2]
class CoGroupedStreams[T1, T2]
class OutputTag[T]