Apache Flink Streaming Java API - Core library for building streaming data processing applications in Java, providing DataStream API, windowing operations, state management, event time processing, and fault-tolerant stream processing capabilities
npx @tessl/cli install tessl/maven-org-apache-flink--flink-streaming-java-2-11@1.14.0Apache Flink Streaming Java API provides a comprehensive framework for building real-time streaming data processing applications. It offers a rich DataStream API for creating streaming pipelines with operations like map, filter, and windowing, supports advanced event-time processing with watermarks, provides exactly-once processing guarantees through checkpointing, and includes built-in support for various data sources and sinks.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.MapFunction;
public class BasicStreamingJob {
public static void main(String[] args) throws Exception {
// Create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create a data stream from a socket
DataStream<String> text = env.socketTextStream("localhost", 9999);
// Transform the data
DataStream<String> upperCase = text.map(new MapFunction<String, String>() {
@Override
public String map(String value) {
return value.toUpperCase();
}
});
// Output the results
upperCase.print();
// Execute the streaming job
env.execute("Basic Streaming Job");
}
}Apache Flink Streaming Java API is built around several key components:
The main entry point for creating streaming applications, providing methods to configure the runtime environment and create data streams from various sources.
// Get execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create data streams
DataStream<String> fromElements = env.fromElements("hello", "world");
DataStream<String> fromSocket = env.socketTextStream("localhost", 9999);
DataStream<String> fromFile = env.readTextFile("path/to/file.txt");
DataStream<T> fromSource = env.addSource(new CustomSourceFunction<T>());
// Execute the job
JobExecutionResult result = env.execute("Job Name");Core data transformation operations for processing unbounded streams, including map, filter, flatMap, and stream composition operations.
// Basic transformations
DataStream<R> mapped = stream.map(MapFunction<T, R> mapper);
DataStream<T> filtered = stream.filter(FilterFunction<T> filter);
DataStream<R> flatMapped = stream.flatMap(FlatMapFunction<T, R> flatMapper);
// Stream partitioning
KeyedStream<T, K> keyed = stream.keyBy(KeySelector<T, K> keySelector);
DataStream<T> shuffled = stream.shuffle();
DataStream<T> rebalanced = stream.rebalance();
// Stream composition
DataStream<T> union = stream.union(otherStream1, otherStream2);
ConnectedStreams<T1, T2> connected = stream1.connect(stream2);Stateful operations on partitioned streams enabling aggregations, stateful processing, and exactly-once guarantees through automatic state management.
// Keyed stream operations
KeyedStream<T, K> keyedStream = dataStream.keyBy(keySelector);
DataStream<T> reduced = keyedStream.reduce(ReduceFunction<T> reducer);
DataStream<R> aggregated = keyedStream.aggregate(AggregateFunction<T, ACC, R> aggFunction);
// Built-in aggregations
DataStream<T> sum = keyedStream.sum("fieldName");
DataStream<T> max = keyedStream.max("fieldName");
DataStream<T> min = keyedStream.min("fieldName");
// Stateful processing
DataStream<R> processed = keyedStream.process(KeyedProcessFunction<K, T, R> function);Group stream elements by time or count for batch-like operations on unbounded streams, with support for tumbling, sliding, and session windows.
// Time-based windows
WindowedStream<T, K, TimeWindow> timeWindow = keyedStream.timeWindow(Time.minutes(5));
WindowedStream<T, K, TimeWindow> slidingWindow = keyedStream.timeWindow(Time.minutes(5), Time.minutes(1));
// Count-based windows
WindowedStream<T, K, GlobalWindow> countWindow = keyedStream.countWindow(100);
// Custom windows
WindowedStream<T, K, W> customWindow = keyedStream.window(WindowAssigner<T, K, W> assigner);
// Window operations
DataStream<R> windowResult = windowedStream.reduce(ReduceFunction<T> function);
DataStream<R> windowApply = windowedStream.apply(WindowFunction<T, R, K, W> function);Rich processing functions that provide access to timers, state, and side outputs for complex stream processing logic.
// Process functions
DataStream<R> processed = stream.process(ProcessFunction<T, R> function);
DataStream<R> keyedProcessed = keyedStream.process(KeyedProcessFunction<K, T, R> function);
DataStream<R> windowProcessed = windowedStream.process(ProcessWindowFunction<T, R, K, W> function);
// Connected stream processing
DataStream<R> coProcessed = connectedStreams.process(CoProcessFunction<T1, T2, R> function);Asynchronous I/O operations for efficient external system integration without blocking stream processing.
// Ordered async processing
SingleOutputStreamOperator<OUT> orderedAsync = AsyncDataStream.orderedWait(
dataStream,
AsyncFunction<IN, OUT> asyncFunction,
1000, TimeUnit.MILLISECONDS
);
// Unordered async processing
SingleOutputStreamOperator<OUT> unorderedAsync = AsyncDataStream.unorderedWait(
dataStream,
AsyncFunction<IN, OUT> asyncFunction,
1000, TimeUnit.MILLISECONDS
);Built-in and custom data sources for ingesting data into streams, and sinks for outputting processed results to external systems.
// Built-in sources
DataStream<String> elements = env.fromElements("a", "b", "c");
DataStream<String> collection = env.fromCollection(Arrays.asList("x", "y", "z"));
DataStream<String> socket = env.socketTextStream("localhost", 9999);
DataStream<String> file = env.readTextFile("path/file.txt");
// Custom sources
DataStream<T> custom = env.addSource(SourceFunction<T> sourceFunction);
// Built-in sinks
stream.print();
stream.writeAsText("output/path");
stream.addSink(SinkFunction<T> sinkFunction);Event time processing with watermark generation for handling out-of-order events and late data in streaming applications.
// Set time characteristic (deprecated in newer versions)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// Watermark strategies
WatermarkStrategy<T> strategy = WatermarkStrategy
.<T>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, timestamp) -> element.getTimestamp());
// Assign watermarks
SingleOutputStreamOperator<T> withWatermarks = stream
.assignTimestampsAndWatermarks(strategy);Configuration and management of checkpoints for fault-tolerant stream processing with exactly-once guarantees.
// Enable checkpointing
env.enableCheckpointing(5000); // checkpoint every 5 seconds
// Configure checkpointing
CheckpointConfig config = env.getCheckpointConfig();
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
config.setMinPauseBetweenCheckpoints(500);
config.setCheckpointTimeout(60000);
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);Checkpointing and Fault Tolerance
// Main stream types
class DataStream<T> {
// Transformation methods
<R> DataStream<R> map(MapFunction<T, R> mapper);
DataStream<T> filter(FilterFunction<T> filter);
<R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper);
KeyedStream<T, K> keyBy(KeySelector<T, K> keySelector);
DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);
}
class KeyedStream<T, K> {
// Stateful operations
DataStream<T> reduce(ReduceFunction<T> reducer);
<R> DataStream<R> aggregate(AggregateFunction<T, ACC, R> aggFunction);
<R> DataStream<R> process(KeyedProcessFunction<K, T, R> function);
WindowedStream<T, K, GlobalWindow> countWindow(long size);
WindowedStream<T, K, TimeWindow> timeWindow(Time size);
}
class SingleOutputStreamOperator<T> extends DataStream<T> {
// Operator configuration
SingleOutputStreamOperator<T> name(String name);
SingleOutputStreamOperator<T> uid(String uid);
SingleOutputStreamOperator<T> setParallelism(int parallelism);
}abstract class StreamExecutionEnvironment {
// Factory methods
static StreamExecutionEnvironment getExecutionEnvironment();
static StreamExecutionEnvironment createLocalEnvironment();
// Source creation
<T> DataStreamSource<T> fromElements(T... data);
<T> DataStreamSource<T> addSource(SourceFunction<T> function);
DataStreamSource<String> socketTextStream(String hostname, int port);
// Execution
JobExecutionResult execute() throws Exception;
JobExecutionResult execute(String jobName) throws Exception;
// Configuration
StreamExecutionEnvironment setParallelism(int parallelism);
void enableCheckpointing(long interval);
CheckpointConfig getCheckpointConfig();
}// Core function interfaces
interface MapFunction<T, O> extends Function {
O map(T value) throws Exception;
}
interface FilterFunction<T> extends Function {
boolean filter(T value) throws Exception;
}
interface FlatMapFunction<T, O> extends Function {
void flatMap(T value, Collector<O> out) throws Exception;
}
interface ReduceFunction<T> extends Function {
T reduce(T value1, T value2) throws Exception;
}
interface KeySelector<IN, KEY> extends Function {
KEY getKey(IN value) throws Exception;
}
// Rich processing functions
abstract class ProcessFunction<I, O> extends AbstractRichFunction {
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
}
abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
}