or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-io.mdcheckpointing.mddatastream-transformations.mdexecution-environment.mdindex.mdkeyed-streams-state.mdprocess-functions.mdsources-sinks.mdtime-watermarks.mdwindowing.md
tile.json

tessl/maven-org-apache-flink--flink-streaming-java-2-11

Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-streaming-java_2.11@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-streaming-java-2-11@1.14.0

index.mddocs/

Apache Flink Streaming Java API

Apache Flink Streaming Java API provides a comprehensive framework for building real-time streaming data processing applications. It offers a rich DataStream API for creating streaming pipelines with operations like map, filter, and windowing, supports advanced event-time processing with watermarks, provides exactly-once processing guarantees through checkpointing, and includes built-in support for various data sources and sinks.

Package Information

  • Package Name: org.apache.flink:flink-streaming-java_2.11
  • Package Type: maven
  • Language: Java
  • Installation: Add to Maven dependencies:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.14.6</version>
    </dependency>

Core Imports

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

Basic Usage

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.MapFunction;

public class BasicStreamingJob {
    public static void main(String[] args) throws Exception {
        // Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // Create a data stream from a socket
        DataStream<String> text = env.socketTextStream("localhost", 9999);
        
        // Transform the data
        DataStream<String> upperCase = text.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) {
                return value.toUpperCase();
            }
        });
        
        // Output the results
        upperCase.print();
        
        // Execute the streaming job
        env.execute("Basic Streaming Job");
    }
}

Architecture

Apache Flink Streaming Java API is built around several key components:

  • StreamExecutionEnvironment: The main entry point for creating and configuring streaming applications
  • DataStream API: Provides transformation operations for processing unbounded streams of data
  • KeyedStream: Enables stateful operations on partitioned streams with automatic state management
  • Windowing System: Groups elements by time or count for batch-like operations on streams
  • Function Interfaces: User-defined functions for custom processing logic
  • Checkpointing: Provides fault-tolerance and exactly-once processing guarantees
  • Time Semantics: Support for event time, processing time, and ingestion time with watermark handling

Capabilities

Stream Execution Environment

The main entry point for creating streaming applications, providing methods to configure the runtime environment and create data streams from various sources.

// Get execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create data streams
DataStream<String> fromElements = env.fromElements("hello", "world");
DataStream<String> fromSocket = env.socketTextStream("localhost", 9999);
DataStream<String> fromFile = env.readTextFile("path/to/file.txt");
DataStream<T> fromSource = env.addSource(new CustomSourceFunction<T>());

// Execute the job
JobExecutionResult result = env.execute("Job Name");

Stream Execution Environment

DataStream Transformations

Core data transformation operations for processing unbounded streams, including map, filter, flatMap, and stream composition operations.

// Basic transformations
DataStream<R> mapped = stream.map(MapFunction<T, R> mapper);
DataStream<T> filtered = stream.filter(FilterFunction<T> filter);
DataStream<R> flatMapped = stream.flatMap(FlatMapFunction<T, R> flatMapper);

// Stream partitioning
KeyedStream<T, K> keyed = stream.keyBy(KeySelector<T, K> keySelector);
DataStream<T> shuffled = stream.shuffle();
DataStream<T> rebalanced = stream.rebalance();

// Stream composition
DataStream<T> union = stream.union(otherStream1, otherStream2);
ConnectedStreams<T1, T2> connected = stream1.connect(stream2);

DataStream Transformations

Keyed Streams and State

Stateful operations on partitioned streams enabling aggregations, stateful processing, and exactly-once guarantees through automatic state management.

// Keyed stream operations
KeyedStream<T, K> keyedStream = dataStream.keyBy(keySelector);
DataStream<T> reduced = keyedStream.reduce(ReduceFunction<T> reducer);
DataStream<R> aggregated = keyedStream.aggregate(AggregateFunction<T, ACC, R> aggFunction);

// Built-in aggregations
DataStream<T> sum = keyedStream.sum("fieldName");
DataStream<T> max = keyedStream.max("fieldName");
DataStream<T> min = keyedStream.min("fieldName");

// Stateful processing
DataStream<R> processed = keyedStream.process(KeyedProcessFunction<K, T, R> function);

Keyed Streams and State

Windowing Operations

Group stream elements by time or count for batch-like operations on unbounded streams, with support for tumbling, sliding, and session windows.

// Time-based windows
WindowedStream<T, K, TimeWindow> timeWindow = keyedStream.timeWindow(Time.minutes(5));
WindowedStream<T, K, TimeWindow> slidingWindow = keyedStream.timeWindow(Time.minutes(5), Time.minutes(1));

// Count-based windows  
WindowedStream<T, K, GlobalWindow> countWindow = keyedStream.countWindow(100);

// Custom windows
WindowedStream<T, K, W> customWindow = keyedStream.window(WindowAssigner<T, K, W> assigner);

// Window operations
DataStream<R> windowResult = windowedStream.reduce(ReduceFunction<T> function);
DataStream<R> windowApply = windowedStream.apply(WindowFunction<T, R, K, W> function);

Windowing Operations

Process Functions

Rich processing functions that provide access to timers, state, and side outputs for complex stream processing logic.

// Process functions
DataStream<R> processed = stream.process(ProcessFunction<T, R> function);
DataStream<R> keyedProcessed = keyedStream.process(KeyedProcessFunction<K, T, R> function);
DataStream<R> windowProcessed = windowedStream.process(ProcessWindowFunction<T, R, K, W> function);

// Connected stream processing
DataStream<R> coProcessed = connectedStreams.process(CoProcessFunction<T1, T2, R> function);

Process Functions

Async I/O Operations

Asynchronous I/O operations for efficient external system integration without blocking stream processing.

// Ordered async processing
SingleOutputStreamOperator<OUT> orderedAsync = AsyncDataStream.orderedWait(
    dataStream, 
    AsyncFunction<IN, OUT> asyncFunction,
    1000, TimeUnit.MILLISECONDS
);

// Unordered async processing
SingleOutputStreamOperator<OUT> unorderedAsync = AsyncDataStream.unorderedWait(
    dataStream,
    AsyncFunction<IN, OUT> asyncFunction, 
    1000, TimeUnit.MILLISECONDS
);

Async I/O Operations

Sources and Sinks

Built-in and custom data sources for ingesting data into streams, and sinks for outputting processed results to external systems.

// Built-in sources
DataStream<String> elements = env.fromElements("a", "b", "c");
DataStream<String> collection = env.fromCollection(Arrays.asList("x", "y", "z"));
DataStream<String> socket = env.socketTextStream("localhost", 9999);
DataStream<String> file = env.readTextFile("path/file.txt");

// Custom sources
DataStream<T> custom = env.addSource(SourceFunction<T> sourceFunction);

// Built-in sinks
stream.print();
stream.writeAsText("output/path");
stream.addSink(SinkFunction<T> sinkFunction);

Sources and Sinks

Time and Watermarks

Event time processing with watermark generation for handling out-of-order events and late data in streaming applications.

// Set time characteristic (deprecated in newer versions)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

// Watermark strategies
WatermarkStrategy<T> strategy = WatermarkStrategy
    .<T>forBoundedOutOfOrderness(Duration.ofSeconds(5))
    .withTimestampAssigner((element, timestamp) -> element.getTimestamp());

// Assign watermarks
SingleOutputStreamOperator<T> withWatermarks = stream
    .assignTimestampsAndWatermarks(strategy);

Time and Watermarks

Checkpointing and Fault Tolerance

Configuration and management of checkpoints for fault-tolerant stream processing with exactly-once guarantees.

// Enable checkpointing
env.enableCheckpointing(5000); // checkpoint every 5 seconds

// Configure checkpointing
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(500);
config.setCheckpointTimeout(60000);
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

Checkpointing and Fault Tolerance

Types

Core Stream Types

// Main stream types
class DataStream<T> {
    // Transformation methods
    <R> DataStream<R> map(MapFunction<T, R> mapper);
    DataStream<T> filter(FilterFunction<T> filter);
    <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper);
    KeyedStream<T, K> keyBy(KeySelector<T, K> keySelector);
    DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);
}

class KeyedStream<T, K> {
    // Stateful operations
    DataStream<T> reduce(ReduceFunction<T> reducer);
    <R> DataStream<R> aggregate(AggregateFunction<T, ACC, R> aggFunction);
    <R> DataStream<R> process(KeyedProcessFunction<K, T, R> function);
    WindowedStream<T, K, GlobalWindow> countWindow(long size);
    WindowedStream<T, K, TimeWindow> timeWindow(Time size);
}

class SingleOutputStreamOperator<T> extends DataStream<T> {
    // Operator configuration
    SingleOutputStreamOperator<T> name(String name);
    SingleOutputStreamOperator<T> uid(String uid);
    SingleOutputStreamOperator<T> setParallelism(int parallelism);
}

Environment and Configuration

abstract class StreamExecutionEnvironment {
    // Factory methods
    static StreamExecutionEnvironment getExecutionEnvironment();
    static StreamExecutionEnvironment createLocalEnvironment();
    
    // Source creation
    <T> DataStreamSource<T> fromElements(T... data);
    <T> DataStreamSource<T> addSource(SourceFunction<T> function);
    DataStreamSource<String> socketTextStream(String hostname, int port);
    
    // Execution
    JobExecutionResult execute() throws Exception;
    JobExecutionResult execute(String jobName) throws Exception;
    
    // Configuration
    StreamExecutionEnvironment setParallelism(int parallelism);
    void enableCheckpointing(long interval);
    CheckpointConfig getCheckpointConfig();
}

Function Interfaces

// Core function interfaces
interface MapFunction<T, O> extends Function {
    O map(T value) throws Exception;
}

interface FilterFunction<T> extends Function {
    boolean filter(T value) throws Exception;
}

interface FlatMapFunction<T, O> extends Function {
    void flatMap(T value, Collector<O> out) throws Exception;
}

interface ReduceFunction<T> extends Function {
    T reduce(T value1, T value2) throws Exception;
}

interface KeySelector<IN, KEY> extends Function {
    KEY getKey(IN value) throws Exception;
}

// Rich processing functions
abstract class ProcessFunction<I, O> extends AbstractRichFunction {
    abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
}

abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
    abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
}