0
# Apache Flink Streaming Java API
1
2
Apache Flink Streaming Java API provides a comprehensive framework for building real-time streaming data processing applications. It offers a rich DataStream API for creating streaming pipelines with operations like map, filter, and windowing, supports advanced event-time processing with watermarks, provides exactly-once processing guarantees through checkpointing, and includes built-in support for various data sources and sinks.
3
4
## Package Information
5
6
- **Package Name**: org.apache.flink:flink-streaming-java_2.11
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to Maven dependencies:
10
```xml
11
<dependency>
12
<groupId>org.apache.flink</groupId>
13
<artifactId>flink-streaming-java_2.11</artifactId>
14
<version>1.14.6</version>
15
</dependency>
16
```
17
18
## Core Imports
19
20
```java
21
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
22
import org.apache.flink.streaming.api.datastream.DataStream;
23
import org.apache.flink.streaming.api.datastream.KeyedStream;
24
import org.apache.flink.streaming.api.functions.ProcessFunction;
25
import org.apache.flink.streaming.api.functions.source.SourceFunction;
26
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
27
```
28
29
## Basic Usage
30
31
```java
32
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
33
import org.apache.flink.streaming.api.datastream.DataStream;
34
import org.apache.flink.api.common.functions.MapFunction;
35
36
public class BasicStreamingJob {
37
public static void main(String[] args) throws Exception {
38
// Create execution environment
39
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
40
41
// Create a data stream from a socket
42
DataStream<String> text = env.socketTextStream("localhost", 9999);
43
44
// Transform the data
45
DataStream<String> upperCase = text.map(new MapFunction<String, String>() {
46
@Override
47
public String map(String value) {
48
return value.toUpperCase();
49
}
50
});
51
52
// Output the results
53
upperCase.print();
54
55
// Execute the streaming job
56
env.execute("Basic Streaming Job");
57
}
58
}
59
```
60
61
## Architecture
62
63
Apache Flink Streaming Java API is built around several key components:
64
65
- **StreamExecutionEnvironment**: The main entry point for creating and configuring streaming applications
66
- **DataStream API**: Provides transformation operations for processing unbounded streams of data
67
- **KeyedStream**: Enables stateful operations on partitioned streams with automatic state management
68
- **Windowing System**: Groups elements by time or count for batch-like operations on streams
69
- **Function Interfaces**: User-defined functions for custom processing logic
70
- **Checkpointing**: Provides fault-tolerance and exactly-once processing guarantees
71
- **Time Semantics**: Support for event time, processing time, and ingestion time with watermark handling
72
73
## Capabilities
74
75
### Stream Execution Environment
76
77
The main entry point for creating streaming applications, providing methods to configure the runtime environment and create data streams from various sources.
78
79
```java { .api }
80
// Get execution environment
81
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
82
83
// Create data streams
84
DataStream<String> fromElements = env.fromElements("hello", "world");
85
DataStream<String> fromSocket = env.socketTextStream("localhost", 9999);
86
DataStream<String> fromFile = env.readTextFile("path/to/file.txt");
87
DataStream<T> fromSource = env.addSource(new CustomSourceFunction<T>());
88
89
// Execute the job
90
JobExecutionResult result = env.execute("Job Name");
91
```
92
93
[Stream Execution Environment](./execution-environment.md)
94
95
### DataStream Transformations
96
97
Core data transformation operations for processing unbounded streams, including map, filter, flatMap, and stream composition operations.
98
99
```java { .api }
100
// Basic transformations
101
DataStream<R> mapped = stream.map(MapFunction<T, R> mapper);
102
DataStream<T> filtered = stream.filter(FilterFunction<T> filter);
103
DataStream<R> flatMapped = stream.flatMap(FlatMapFunction<T, R> flatMapper);
104
105
// Stream partitioning
106
KeyedStream<T, K> keyed = stream.keyBy(KeySelector<T, K> keySelector);
107
DataStream<T> shuffled = stream.shuffle();
108
DataStream<T> rebalanced = stream.rebalance();
109
110
// Stream composition
111
DataStream<T> union = stream.union(otherStream1, otherStream2);
112
ConnectedStreams<T1, T2> connected = stream1.connect(stream2);
113
```
114
115
[DataStream Transformations](./datastream-transformations.md)
116
117
### Keyed Streams and State
118
119
Stateful operations on partitioned streams enabling aggregations, stateful processing, and exactly-once guarantees through automatic state management.
120
121
```java { .api }
122
// Keyed stream operations
123
KeyedStream<T, K> keyedStream = dataStream.keyBy(keySelector);
124
DataStream<T> reduced = keyedStream.reduce(ReduceFunction<T> reducer);
125
DataStream<R> aggregated = keyedStream.aggregate(AggregateFunction<T, ACC, R> aggFunction);
126
127
// Built-in aggregations
128
DataStream<T> sum = keyedStream.sum("fieldName");
129
DataStream<T> max = keyedStream.max("fieldName");
130
DataStream<T> min = keyedStream.min("fieldName");
131
132
// Stateful processing
133
DataStream<R> processed = keyedStream.process(KeyedProcessFunction<K, T, R> function);
134
```
135
136
[Keyed Streams and State](./keyed-streams-state.md)
137
138
### Windowing Operations
139
140
Group stream elements by time or count for batch-like operations on unbounded streams, with support for tumbling, sliding, and session windows.
141
142
```java { .api }
143
// Time-based windows
144
WindowedStream<T, K, TimeWindow> timeWindow = keyedStream.timeWindow(Time.minutes(5));
145
WindowedStream<T, K, TimeWindow> slidingWindow = keyedStream.timeWindow(Time.minutes(5), Time.minutes(1));
146
147
// Count-based windows
148
WindowedStream<T, K, GlobalWindow> countWindow = keyedStream.countWindow(100);
149
150
// Custom windows
151
WindowedStream<T, K, W> customWindow = keyedStream.window(WindowAssigner<T, K, W> assigner);
152
153
// Window operations
154
DataStream<R> windowResult = windowedStream.reduce(ReduceFunction<T> function);
155
DataStream<R> windowApply = windowedStream.apply(WindowFunction<T, R, K, W> function);
156
```
157
158
[Windowing Operations](./windowing.md)
159
160
### Process Functions
161
162
Rich processing functions that provide access to timers, state, and side outputs for complex stream processing logic.
163
164
```java { .api }
165
// Process functions
166
DataStream<R> processed = stream.process(ProcessFunction<T, R> function);
167
DataStream<R> keyedProcessed = keyedStream.process(KeyedProcessFunction<K, T, R> function);
168
DataStream<R> windowProcessed = windowedStream.process(ProcessWindowFunction<T, R, K, W> function);
169
170
// Connected stream processing
171
DataStream<R> coProcessed = connectedStreams.process(CoProcessFunction<T1, T2, R> function);
172
```
173
174
[Process Functions](./process-functions.md)
175
176
### Async I/O Operations
177
178
Asynchronous I/O operations for efficient external system integration without blocking stream processing.
179
180
```java { .api }
181
// Ordered async processing
182
SingleOutputStreamOperator<OUT> orderedAsync = AsyncDataStream.orderedWait(
183
dataStream,
184
AsyncFunction<IN, OUT> asyncFunction,
185
1000, TimeUnit.MILLISECONDS
186
);
187
188
// Unordered async processing
189
SingleOutputStreamOperator<OUT> unorderedAsync = AsyncDataStream.unorderedWait(
190
dataStream,
191
AsyncFunction<IN, OUT> asyncFunction,
192
1000, TimeUnit.MILLISECONDS
193
);
194
```
195
196
[Async I/O Operations](./async-io.md)
197
198
### Sources and Sinks
199
200
Built-in and custom data sources for ingesting data into streams, and sinks for outputting processed results to external systems.
201
202
```java { .api }
203
// Built-in sources
204
DataStream<String> elements = env.fromElements("a", "b", "c");
205
DataStream<String> collection = env.fromCollection(Arrays.asList("x", "y", "z"));
206
DataStream<String> socket = env.socketTextStream("localhost", 9999);
207
DataStream<String> file = env.readTextFile("path/file.txt");
208
209
// Custom sources
210
DataStream<T> custom = env.addSource(SourceFunction<T> sourceFunction);
211
212
// Built-in sinks
213
stream.print();
214
stream.writeAsText("output/path");
215
stream.addSink(SinkFunction<T> sinkFunction);
216
```
217
218
[Sources and Sinks](./sources-sinks.md)
219
220
### Time and Watermarks
221
222
Event time processing with watermark generation for handling out-of-order events and late data in streaming applications.
223
224
```java { .api }
225
// Set time characteristic (deprecated in newer versions)
226
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
227
228
// Watermark strategies
229
WatermarkStrategy<T> strategy = WatermarkStrategy
230
.<T>forBoundedOutOfOrderness(Duration.ofSeconds(5))
231
.withTimestampAssigner((element, timestamp) -> element.getTimestamp());
232
233
// Assign watermarks
234
SingleOutputStreamOperator<T> withWatermarks = stream
235
.assignTimestampsAndWatermarks(strategy);
236
```
237
238
[Time and Watermarks](./time-watermarks.md)
239
240
### Checkpointing and Fault Tolerance
241
242
Configuration and management of checkpoints for fault-tolerant stream processing with exactly-once guarantees.
243
244
```java { .api }
245
// Enable checkpointing
246
env.enableCheckpointing(5000); // checkpoint every 5 seconds
247
248
// Configure checkpointing
249
CheckpointConfig config = env.getCheckpointConfig();
250
config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
251
config.setMinPauseBetweenCheckpoints(500);
252
config.setCheckpointTimeout(60000);
253
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
254
```
255
256
[Checkpointing and Fault Tolerance](./checkpointing.md)
257
258
## Types
259
260
### Core Stream Types
261
262
```java { .api }
263
// Main stream types
264
class DataStream<T> {
265
// Transformation methods
266
<R> DataStream<R> map(MapFunction<T, R> mapper);
267
DataStream<T> filter(FilterFunction<T> filter);
268
<R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper);
269
KeyedStream<T, K> keyBy(KeySelector<T, K> keySelector);
270
DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);
271
}
272
273
class KeyedStream<T, K> {
274
// Stateful operations
275
DataStream<T> reduce(ReduceFunction<T> reducer);
276
<R> DataStream<R> aggregate(AggregateFunction<T, ACC, R> aggFunction);
277
<R> DataStream<R> process(KeyedProcessFunction<K, T, R> function);
278
WindowedStream<T, K, GlobalWindow> countWindow(long size);
279
WindowedStream<T, K, TimeWindow> timeWindow(Time size);
280
}
281
282
class SingleOutputStreamOperator<T> extends DataStream<T> {
283
// Operator configuration
284
SingleOutputStreamOperator<T> name(String name);
285
SingleOutputStreamOperator<T> uid(String uid);
286
SingleOutputStreamOperator<T> setParallelism(int parallelism);
287
}
288
```
289
290
### Environment and Configuration
291
292
```java { .api }
293
abstract class StreamExecutionEnvironment {
294
// Factory methods
295
static StreamExecutionEnvironment getExecutionEnvironment();
296
static StreamExecutionEnvironment createLocalEnvironment();
297
298
// Source creation
299
<T> DataStreamSource<T> fromElements(T... data);
300
<T> DataStreamSource<T> addSource(SourceFunction<T> function);
301
DataStreamSource<String> socketTextStream(String hostname, int port);
302
303
// Execution
304
JobExecutionResult execute() throws Exception;
305
JobExecutionResult execute(String jobName) throws Exception;
306
307
// Configuration
308
StreamExecutionEnvironment setParallelism(int parallelism);
309
void enableCheckpointing(long interval);
310
CheckpointConfig getCheckpointConfig();
311
}
312
```
313
314
### Function Interfaces
315
316
```java { .api }
317
// Core function interfaces
318
interface MapFunction<T, O> extends Function {
319
O map(T value) throws Exception;
320
}
321
322
interface FilterFunction<T> extends Function {
323
boolean filter(T value) throws Exception;
324
}
325
326
interface FlatMapFunction<T, O> extends Function {
327
void flatMap(T value, Collector<O> out) throws Exception;
328
}
329
330
interface ReduceFunction<T> extends Function {
331
T reduce(T value1, T value2) throws Exception;
332
}
333
334
interface KeySelector<IN, KEY> extends Function {
335
KEY getKey(IN value) throws Exception;
336
}
337
338
// Rich processing functions
339
abstract class ProcessFunction<I, O> extends AbstractRichFunction {
340
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
341
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
342
}
343
344
abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
345
abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
346
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception;
347
}
348
```