0
# DataStream API (Traditional)
1
2
Traditional DataStream API providing comprehensive stream processing capabilities with windowing, state management, and complex event processing features. This is the stable, production-ready API used in most Flink applications.
3
4
## Capabilities
5
6
### Execution Environment
7
8
Main entry point for traditional DataStream programs.
9
10
```java { .api }
11
/**
12
* Main entry point for streaming programs
13
*/
14
class StreamExecutionEnvironment {
15
/**
16
* Get execution environment with default configuration
17
* @return Execution environment
18
*/
19
public static StreamExecutionEnvironment getExecutionEnvironment();
20
21
/**
22
* Create local execution environment
23
* @return Local execution environment
24
*/
25
public static StreamExecutionEnvironment createLocalEnvironment();
26
27
/**
28
* Add source function to create data stream
29
* @param function Source function
30
* @param <T> Element type
31
* @return DataStream with elements from source
32
*/
33
public <T> DataStream<T> addSource(SourceFunction<T> function);
34
35
/**
36
* Create data stream from collection
37
* @param data Collection of elements
38
* @param <T> Element type
39
* @return DataStream with elements
40
*/
41
public <T> DataStream<T> fromCollection(Collection<T> data);
42
43
/**
44
* Create data stream from elements
45
* @param data Varargs elements
46
* @param <T> Element type
47
* @return DataStream with elements
48
*/
49
public <T> DataStream<T> fromElements(@SuppressWarnings("unchecked") T... data);
50
51
/**
52
* Execute the streaming program
53
* @return Job execution result
54
* @throws Exception
55
*/
56
public JobExecutionResult execute() throws Exception;
57
58
/**
59
* Execute with job name
60
* @param jobName Job name
61
* @return Job execution result
62
* @throws Exception
63
*/
64
public JobExecutionResult execute(String jobName) throws Exception;
65
66
/**
67
* Set parallelism for operations
68
* @param parallelism Parallelism level
69
*/
70
public void setParallelism(int parallelism);
71
72
/**
73
* Enable checkpointing
74
* @param interval Checkpoint interval in milliseconds
75
*/
76
public void enableCheckpointing(long interval);
77
}
78
```
79
80
### DataStream Operations
81
82
Core stream transformation operations.
83
84
```java { .api }
85
/**
86
* Core stream abstraction for traditional API
87
* @param <T> Element type
88
*/
89
class DataStream<T> {
90
/**
91
* Apply map transformation
92
* @param mapper Map function
93
* @param <R> Result type
94
* @return Transformed stream
95
*/
96
public <R> DataStream<R> map(MapFunction<T, R> mapper);
97
98
/**
99
* Apply flatMap transformation
100
* @param flatMapper FlatMap function
101
* @param <R> Result type
102
* @return Transformed stream
103
*/
104
public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper);
105
106
/**
107
* Filter elements
108
* @param filter Filter function
109
* @return Filtered stream
110
*/
111
public DataStream<T> filter(FilterFunction<T> filter);
112
113
/**
114
* Partition stream by key
115
* @param keySelector Key selector function
116
* @param <K> Key type
117
* @return Keyed stream
118
*/
119
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> keySelector);
120
121
/**
122
* Connect with another stream
123
* @param dataStream Other stream
124
* @param <T2> Other stream type
125
* @return Connected streams
126
*/
127
public <T2> ConnectedStreams<T, T2> connect(DataStream<T2> dataStream);
128
129
/**
130
* Union with other streams
131
* @param streams Other streams
132
* @return Union of streams
133
*/
134
public DataStream<T> union(DataStream<T>... streams);
135
136
/**
137
* Apply process function
138
* @param processFunction Process function
139
* @param <R> Result type
140
* @return Processed stream
141
*/
142
public <R> DataStream<R> process(ProcessFunction<T, R> processFunction);
143
144
/**
145
* Add sink
146
* @param sinkFunction Sink function
147
* @return Data stream sink
148
*/
149
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction);
150
151
/**
152
* Print elements to output
153
* @return Data stream sink
154
*/
155
public DataStreamSink<T> print();
156
}
157
```
158
159
### Keyed Stream Operations
160
161
Operations available on keyed streams for stateful processing.
162
163
```java { .api }
164
/**
165
* Keyed stream for stateful operations
166
* @param <T> Element type
167
* @param <K> Key type
168
*/
169
class KeyedStream<T, K> {
170
/**
171
* Reduce elements by key
172
* @param reducer Reduce function
173
* @return Stream with reduced elements
174
*/
175
public DataStream<T> reduce(ReduceFunction<T> reducer);
176
177
/**
178
* Apply process function with key context
179
* @param keyedProcessFunction Keyed process function
180
* @param <R> Result type
181
* @return Processed stream
182
*/
183
public <R> DataStream<R> process(KeyedProcessFunction<K, T, R> keyedProcessFunction);
184
185
/**
186
* Create windowed stream
187
* @param assigner Window assigner
188
* @param <W> Window type
189
* @return Windowed stream
190
*/
191
public <W extends Window> WindowedStream<T, K, W> window(WindowAssigner<? super T, W> assigner);
192
193
/**
194
* Create time windowed stream
195
* @param assigner Time window assigner
196
* @return Time windowed stream
197
*/
198
public WindowedStream<T, K, TimeWindow> timeWindow(Time size);
199
200
/**
201
* Create sliding time windowed stream
202
* @param size Window size
203
* @param slide Slide interval
204
* @return Time windowed stream
205
*/
206
public WindowedStream<T, K, TimeWindow> timeWindow(Time size, Time slide);
207
}
208
```
209
210
### Windowed Stream Operations
211
212
Operations on windowed streams.
213
214
```java { .api }
215
/**
216
* Windowed stream operations
217
* @param <T> Element type
218
* @param <K> Key type
219
* @param <W> Window type
220
*/
221
class WindowedStream<T, K, W extends Window> {
222
/**
223
* Reduce elements within windows
224
* @param function Reduce function
225
* @return Stream with window results
226
*/
227
public DataStream<T> reduce(ReduceFunction<T> function);
228
229
/**
230
* Aggregate elements within windows
231
* @param function Aggregate function
232
* @param <ACC> Accumulator type
233
* @param <R> Result type
234
* @return Stream with aggregated results
235
*/
236
public <ACC, R> DataStream<R> aggregate(AggregateFunction<T, ACC, R> function);
237
238
/**
239
* Apply window function
240
* @param function Window function
241
* @param <R> Result type
242
* @return Stream with window results
243
*/
244
public <R> DataStream<R> apply(WindowFunction<T, R, K, W> function);
245
246
/**
247
* Apply process window function
248
* @param function Process window function
249
* @param <R> Result type
250
* @return Stream with processed results
251
*/
252
public <R> DataStream<R> process(ProcessWindowFunction<T, R, K, W> function);
253
254
/**
255
* Set window trigger
256
* @param trigger Window trigger
257
* @return Windowed stream with trigger
258
*/
259
public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger);
260
261
/**
262
* Set window evictor
263
* @param evictor Window evictor
264
* @return Windowed stream with evictor
265
*/
266
public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor);
267
}
268
```
269
270
### Process Functions
271
272
Process function interfaces for custom processing logic.
273
274
```java { .api }
275
/**
276
* Process function for single input streams
277
* @param <I> Input type
278
* @param <O> Output type
279
*/
280
abstract class ProcessFunction<I, O> {
281
/**
282
* Process element
283
* @param value Input element
284
* @param ctx Process context
285
* @param out Output collector
286
* @throws Exception
287
*/
288
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
289
290
/**
291
* Process timer
292
* @param timestamp Timer timestamp
293
* @param ctx OnTimer context
294
* @param out Output collector
295
* @throws Exception
296
*/
297
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
298
299
/**
300
* Process context
301
*/
302
public abstract class Context {
303
/**
304
* Get current element timestamp
305
* @return Element timestamp
306
*/
307
public abstract Long timestamp();
308
309
/**
310
* Register processing time timer
311
* @param timestamp Timer timestamp
312
*/
313
public abstract void timerService().registerProcessingTimeTimer(long timestamp);
314
315
/**
316
* Output to side output
317
* @param outputTag Output tag
318
* @param value Value to output
319
* @param <X> Value type
320
*/
321
public abstract <X> void output(OutputTag<X> outputTag, X value);
322
}
323
}
324
325
/**
326
* Keyed process function
327
* @param <K> Key type
328
* @param <I> Input type
329
* @param <O> Output type
330
*/
331
abstract class KeyedProcessFunction<K, I, O> {
332
/**
333
* Process element with key context
334
* @param value Input element
335
* @param ctx Keyed context
336
* @param out Output collector
337
* @throws Exception
338
*/
339
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
340
341
/**
342
* Process timer with key context
343
* @param timestamp Timer timestamp
344
* @param ctx OnTimer context
345
* @param out Output collector
346
* @throws Exception
347
*/
348
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
349
350
/**
351
* Keyed process context
352
*/
353
public abstract class Context {
354
/**
355
* Get current key
356
* @return Current key
357
*/
358
public abstract K getCurrentKey();
359
360
/**
361
* Get timer service
362
* @return Timer service
363
*/
364
public abstract TimerService timerService();
365
}
366
}
367
```
368
369
### Supporting Types
370
371
Supporting types and interfaces for the traditional DataStream API.
372
373
```java { .api }
374
/**
375
* Source function interface for creating data streams
376
* @param <T> Element type
377
*/
378
interface SourceFunction<T> {
379
/**
380
* Run source function
381
* @param ctx Source context
382
* @throws Exception
383
*/
384
void run(SourceContext<T> ctx) throws Exception;
385
386
/**
387
* Cancel source function
388
*/
389
void cancel();
390
391
/**
392
* Source context for emitting elements
393
* @param <T> Element type
394
*/
395
interface SourceContext<T> {
396
/**
397
* Collect element
398
* @param element Element to collect
399
*/
400
void collect(T element);
401
402
/**
403
* Collect element with timestamp
404
* @param element Element to collect
405
* @param timestamp Element timestamp
406
*/
407
void collectWithTimestamp(T element, long timestamp);
408
409
/**
410
* Emit watermark
411
* @param mark Watermark
412
*/
413
void emitWatermark(Watermark mark);
414
415
/**
416
* Mark source as temporarily idle
417
*/
418
void markAsTemporarilyIdle();
419
420
/**
421
* Get checkpoint lock
422
* @return Checkpoint lock
423
*/
424
Object getCheckpointLock();
425
426
/**
427
* Close the source context
428
*/
429
void close();
430
}
431
}
432
433
/**
434
* Sink function interface for consuming data streams
435
* @param <IN> Input element type
436
*/
437
interface SinkFunction<IN> {
438
/**
439
* Invoke sink function with element
440
* @param value Input element
441
* @param context Sink context
442
* @throws Exception
443
*/
444
void invoke(IN value, Context context) throws Exception;
445
446
/**
447
* Sink context
448
*/
449
interface Context {
450
/**
451
* Get current processing time
452
* @return Current processing time
453
*/
454
long currentProcessingTime();
455
456
/**
457
* Get current watermark
458
* @return Current watermark
459
*/
460
long currentWatermark();
461
462
/**
463
* Get element timestamp
464
* @return Element timestamp
465
*/
466
Long timestamp();
467
}
468
}
469
470
/**
471
* Data stream sink
472
* @param <T> Element type
473
*/
474
class DataStreamSink<T> {
475
/**
476
* Set sink parallelism
477
* @param parallelism Parallelism level
478
* @return Data stream sink
479
*/
480
public DataStreamSink<T> setParallelism(int parallelism);
481
482
/**
483
* Disable chaining for this sink
484
* @return Data stream sink
485
*/
486
public DataStreamSink<T> disableChaining();
487
488
/**
489
* Set slot sharing group
490
* @param slotSharingGroup Slot sharing group
491
* @return Data stream sink
492
*/
493
public DataStreamSink<T> slotSharingGroup(String slotSharingGroup);
494
495
/**
496
* Set sink name
497
* @param name Sink name
498
* @return Data stream sink
499
*/
500
public DataStreamSink<T> name(String name);
501
}
502
503
/**
504
* Job execution result
505
*/
506
class JobExecutionResult {
507
/**
508
* Get job execution time
509
* @return Execution time in milliseconds
510
*/
511
public long getNetRuntime();
512
513
/**
514
* Get accumulator result
515
* @param accumulatorName Accumulator name
516
* @param <T> Result type
517
* @return Accumulator result
518
*/
519
public <T> T getAccumulatorResult(String accumulatorName);
520
521
/**
522
* Get all accumulator results
523
* @return Map of accumulator results
524
*/
525
public Map<String, Object> getAllAccumulatorResults();
526
}
527
528
/**
529
* Timer service for managing timers
530
*/
531
interface TimerService {
532
/**
533
* Get current processing time
534
* @return Current processing time
535
*/
536
long currentProcessingTime();
537
538
/**
539
* Get current watermark
540
* @return Current watermark
541
*/
542
long currentWatermark();
543
544
/**
545
* Register processing time timer
546
* @param timestamp Timer timestamp
547
*/
548
void registerProcessingTimeTimer(long timestamp);
549
550
/**
551
* Register event time timer
552
* @param timestamp Timer timestamp
553
*/
554
void registerEventTimeTimer(long timestamp);
555
556
/**
557
* Delete processing time timer
558
* @param timestamp Timer timestamp
559
*/
560
void deleteProcessingTimeTimer(long timestamp);
561
562
/**
563
* Delete event time timer
564
* @param timestamp Timer timestamp
565
*/
566
void deleteEventTimeTimer(long timestamp);
567
}
568
569
/**
570
* Connected streams for processing two input streams
571
* @param <IN1> First input type
572
* @param <IN2> Second input type
573
*/
574
class ConnectedStreams<IN1, IN2> {
575
/**
576
* Apply co-map function
577
* @param coMapper Co-map function
578
* @param <R> Result type
579
* @return Data stream
580
*/
581
public <R> DataStream<R> map(CoMapFunction<IN1, IN2, R> coMapper);
582
583
/**
584
* Apply co-flat-map function
585
* @param coFlatMapper Co-flat-map function
586
* @param <R> Result type
587
* @return Data stream
588
*/
589
public <R> DataStream<R> flatMap(CoFlatMapFunction<IN1, IN2, R> coFlatMapper);
590
591
/**
592
* Apply co-process function
593
* @param coProcessFunction Co-process function
594
* @param <R> Result type
595
* @return Data stream
596
*/
597
public <R> DataStream<R> process(CoProcessFunction<IN1, IN2, R> coProcessFunction);
598
}
599
```
600
601
### Function Interfaces
602
603
Additional function interfaces for stream processing.
604
605
```java { .api }
606
/**
607
* Co-map function for connected streams
608
* @param <IN1> First input type
609
* @param <IN2> Second input type
610
* @param <OUT> Output type
611
*/
612
interface CoMapFunction<IN1, IN2, OUT> extends Function {
613
/**
614
* Map function for first input
615
* @param value Value from first input
616
* @return Mapped value
617
* @throws Exception
618
*/
619
OUT map1(IN1 value) throws Exception;
620
621
/**
622
* Map function for second input
623
* @param value Value from second input
624
* @return Mapped value
625
* @throws Exception
626
*/
627
OUT map2(IN2 value) throws Exception;
628
}
629
630
/**
631
* Co-flat-map function for connected streams
632
* @param <IN1> First input type
633
* @param <IN2> Second input type
634
* @param <OUT> Output type
635
*/
636
interface CoFlatMapFunction<IN1, IN2, OUT> extends Function {
637
/**
638
* Flat-map function for first input
639
* @param value Value from first input
640
* @param out Output collector
641
* @throws Exception
642
*/
643
void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
644
645
/**
646
* Flat-map function for second input
647
* @param value Value from second input
648
* @param out Output collector
649
* @throws Exception
650
*/
651
void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
652
}
653
654
/**
655
* Co-process function for connected streams
656
* @param <IN1> First input type
657
* @param <IN2> Second input type
658
* @param <OUT> Output type
659
*/
660
abstract class CoProcessFunction<IN1, IN2, OUT> {
661
/**
662
* Process element from first input
663
* @param value Element from first input
664
* @param ctx Process context
665
* @param out Output collector
666
* @throws Exception
667
*/
668
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
669
670
/**
671
* Process element from second input
672
* @param value Element from second input
673
* @param ctx Process context
674
* @param out Output collector
675
* @throws Exception
676
*/
677
public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
678
679
/**
680
* Process timer
681
* @param timestamp Timer timestamp
682
* @param ctx OnTimer context
683
* @param out Output collector
684
* @throws Exception
685
*/
686
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}
687
688
/**
689
* Process context
690
*/
691
public abstract class Context {
692
/**
693
* Get timer service
694
* @return Timer service
695
*/
696
public abstract TimerService timerService();
697
698
/**
699
* Get current timestamp
700
* @return Current timestamp
701
*/
702
public abstract Long timestamp();
703
704
/**
705
* Output to side output
706
* @param outputTag Output tag
707
* @param value Value to output
708
* @param <X> Value type
709
*/
710
public abstract <X> void output(OutputTag<X> outputTag, X value);
711
}
712
713
/**
714
* OnTimer context
715
*/
716
public abstract class OnTimerContext extends Context {
717
/**
718
* Get timer timestamp
719
* @return Timer timestamp
720
*/
721
public abstract Long timestamp();
722
723
/**
724
* Get timer domain
725
* @return Timer domain
726
*/
727
public abstract TimeDomain timeDomain();
728
}
729
}
730
731
/**
732
* Time domain enumeration
733
*/
734
enum TimeDomain {
735
/** Event time domain */
736
EVENT_TIME,
737
/** Processing time domain */
738
PROCESSING_TIME
739
}
740
```
741
742
### Asynchronous Operations
743
744
Support for asynchronous I/O operations in stream processing.
745
746
```java { .api }
747
/**
748
* Function interface for asynchronous operations
749
* @param <IN> Input type
750
* @param <OUT> Output type
751
*/
752
interface AsyncFunction<IN, OUT> extends Function {
753
/**
754
* Trigger async operation for input element
755
* @param input Input element
756
* @param resultFuture Future to complete with results
757
* @throws Exception
758
*/
759
void asyncInvoke(IN input, ResultFuture<OUT> resultFuture) throws Exception;
760
761
/**
762
* Handle timeout for async operation (optional)
763
* @param input Input element that timed out
764
* @param resultFuture Future to complete with results
765
* @throws Exception
766
*/
767
default void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception {
768
resultFuture.completeExceptionally(
769
new TimeoutException("Async operation timed out"));
770
}
771
}
772
773
/**
774
* Rich async function with runtime context access
775
* @param <IN> Input type
776
* @param <OUT> Output type
777
*/
778
abstract class RichAsyncFunction<IN, OUT> extends AbstractRichFunction implements AsyncFunction<IN, OUT> {}
779
780
/**
781
* Utility class for applying async operations to data streams
782
*/
783
class AsyncDataStream {
784
/**
785
* Apply async function to stream with unordered output
786
* @param input Input stream
787
* @param function Async function
788
* @param timeout Operation timeout
789
* @param timeUnit Timeout time unit
790
* @param capacity Maximum number of concurrent async operations
791
* @param <IN> Input type
792
* @param <OUT> Output type
793
* @return Stream with async results
794
*/
795
public static <IN, OUT> DataStream<OUT> unorderedWait(
796
DataStream<IN> input,
797
AsyncFunction<IN, OUT> function,
798
long timeout,
799
TimeUnit timeUnit,
800
int capacity);
801
802
/**
803
* Apply async function to stream with unordered output (default capacity)
804
* @param input Input stream
805
* @param function Async function
806
* @param timeout Operation timeout
807
* @param timeUnit Timeout time unit
808
* @param <IN> Input type
809
* @param <OUT> Output type
810
* @return Stream with async results
811
*/
812
public static <IN, OUT> DataStream<OUT> unorderedWait(
813
DataStream<IN> input,
814
AsyncFunction<IN, OUT> function,
815
long timeout,
816
TimeUnit timeUnit);
817
818
/**
819
* Apply async function to stream with ordered output
820
* @param input Input stream
821
* @param function Async function
822
* @param timeout Operation timeout
823
* @param timeUnit Timeout time unit
824
* @param capacity Maximum number of concurrent async operations
825
* @param <IN> Input type
826
* @param <OUT> Output type
827
* @return Stream with async results (order preserved)
828
*/
829
public static <IN, OUT> DataStream<OUT> orderedWait(
830
DataStream<IN> input,
831
AsyncFunction<IN, OUT> function,
832
long timeout,
833
TimeUnit timeUnit,
834
int capacity);
835
836
/**
837
* Apply async function to stream with ordered output (default capacity)
838
* @param input Input stream
839
* @param function Async function
840
* @param timeout Operation timeout
841
* @param timeUnit Timeout time unit
842
* @param <IN> Input type
843
* @param <OUT> Output type
844
* @return Stream with async results (order preserved)
845
*/
846
public static <IN, OUT> DataStream<OUT> orderedWait(
847
DataStream<IN> input,
848
AsyncFunction<IN, OUT> function,
849
long timeout,
850
TimeUnit timeUnit);
851
}
852
853
/**
854
* Future for collecting async operation results
855
* @param <OUT> Result type
856
*/
857
interface ResultFuture<OUT> {
858
/**
859
* Complete future with single result
860
* @param result Result value
861
*/
862
void complete(Collection<OUT> result);
863
864
/**
865
* Complete future with single result
866
* @param result Result value
867
*/
868
default void complete(OUT result) {
869
complete(Collections.singletonList(result));
870
}
871
872
/**
873
* Complete future exceptionally
874
* @param error Exception
875
*/
876
void completeExceptionally(Throwable error);
877
}
878
```