0
# DataStream API (New v2)
1
2
Next-generation DataStream API with improved type safety, better performance, and enhanced functionality. This experimental API provides a streamlined programming model for stream processing applications with cleaner abstractions and more intuitive operation semantics.
3
4
## Capabilities
5
6
### Execution Environment
7
8
Main entry point for creating DataStream programs with the new v2 API.
9
10
```java { .api }
11
/**
12
* Main entry point for DataStream programs using the new v2 API
13
*/
14
interface ExecutionEnvironment {
15
/**
16
* Create data stream from source
17
* @param source Data source
18
* @param <T> Element type
19
* @return DataStream with elements from source
20
*/
21
<T> DataStream<T> fromSource(Source<T> source);
22
23
/**
24
* Create data stream from collection
25
* @param collection Collection of elements
26
* @param <T> Element type
27
* @return DataStream with elements from collection
28
*/
29
<T> DataStream<T> fromCollection(Collection<T> collection);
30
31
/**
32
* Create data stream from elements
33
* @param elements Varargs elements
34
* @param <T> Element type
35
* @return DataStream with provided elements
36
*/
37
<T> DataStream<T> fromElements(T... elements);
38
39
/**
40
* Execute the streaming program
41
* @return Job execution result
42
*/
43
CompletableFuture<JobExecutionResult> execute();
44
45
/**
46
* Execute the streaming program with job name
47
* @param jobName Name of the job
48
* @return Job execution result
49
*/
50
CompletableFuture<JobExecutionResult> execute(String jobName);
51
}
52
```
53
54
### Stream Types
55
56
Core stream abstractions in the v2 API providing different partitioning and processing semantics.
57
58
```java { .api }
59
/**
60
* Fundamental data stream interface
61
* @param <T> Element type
62
*/
63
interface DataStream<T> {
64
/**
65
* Apply process function to transform elements
66
* @param processFunction Process function
67
* @param <OUT> Output type
68
* @return Transformed stream
69
*/
70
<OUT> DataStream<OUT> process(OneInputStreamProcessFunction<T, OUT> processFunction);
71
72
/**
73
* Partition stream by key
74
* @param keySelector Key extraction function
75
* @param <K> Key type
76
* @return Keyed partitioned stream
77
*/
78
<K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);
79
80
/**
81
* Connect with another stream for dual-input processing
82
* @param other Other stream
83
* @param <T2> Other stream element type
84
* @return Connected streams
85
*/
86
<T2> TwoInputConnectedStreams<T, T2> connectWith(DataStream<T2> other);
87
88
/**
89
* Broadcast stream to all parallel instances
90
* @return Broadcast stream
91
*/
92
BroadcastStream<T> broadcast();
93
94
/**
95
* Send all elements to single parallel instance
96
* @return Global stream
97
*/
98
GlobalStream<T> global();
99
100
/**
101
* Add sink to consume stream elements
102
* @param sink Data sink
103
* @return Sink transformation
104
*/
105
DataStreamSink<T> sinkTo(Sink<T> sink);
106
}
107
108
/**
109
* Keyed and partitioned stream for stateful operations
110
* @param <K> Key type
111
* @param <T> Element type
112
*/
113
interface KeyedPartitionStream<K, T> {
114
/**
115
* Apply keyed process function
116
* @param processFunction Keyed process function
117
* @param <OUT> Output type
118
* @return Transformed stream
119
*/
120
<OUT> DataStream<OUT> process(KeyedProcessFunction<K, T, OUT> processFunction);
121
122
/**
123
* Reduce elements by key using reduce function
124
* @param reduceFunction Reduce function
125
* @return Stream with reduced elements
126
*/
127
DataStream<T> reduce(ReduceFunction<T> reduceFunction);
128
129
/**
130
* Aggregate elements by key using aggregate function
131
* @param aggregateFunction Aggregate function
132
* @param <ACC> Accumulator type
133
* @param <OUT> Output type
134
* @return Stream with aggregated elements
135
*/
136
<ACC, OUT> DataStream<OUT> aggregate(AggregateFunction<T, ACC, OUT> aggregateFunction);
137
}
138
139
/**
140
* Non-keyed partitioned stream
141
* @param <T> Element type
142
*/
143
interface NonKeyedPartitionStream<T> {
144
/**
145
* Apply process function to stream
146
* @param processFunction Process function
147
* @param <OUT> Output type
148
* @return Transformed stream
149
*/
150
<OUT> DataStream<OUT> process(OneInputStreamProcessFunction<T, OUT> processFunction);
151
}
152
153
/**
154
* Broadcast stream that sends elements to all parallel instances
155
* @param <T> Element type
156
*/
157
interface BroadcastStream<T> {
158
/**
159
* Connect with keyed stream for broadcast processing
160
* @param keyedStream Keyed stream
161
* @param <K> Key type
162
* @param <KS> Keyed stream element type
163
* @return Broadcast connected stream
164
*/
165
<K, KS> BroadcastConnectedStream<KS, T> connectWith(KeyedPartitionStream<K, KS> keyedStream);
166
167
/**
168
* Connect with non-keyed stream for broadcast processing
169
* @param stream Non-keyed stream
170
* @param <S> Stream element type
171
* @return Broadcast connected stream
172
*/
173
<S> BroadcastConnectedStream<S, T> connectWith(NonKeyedPartitionStream<S> stream);
174
}
175
176
/**
177
* Global stream processed by single parallel instance
178
* @param <T> Element type
179
*/
180
interface GlobalStream<T> {
181
/**
182
* Apply process function to global stream
183
* @param processFunction Process function
184
* @param <OUT> Output type
185
* @return Transformed stream
186
*/
187
<OUT> DataStream<OUT> process(OneInputStreamProcessFunction<T, OUT> processFunction);
188
}
189
```
190
191
### Process Functions
192
193
Process function interfaces for the v2 API providing flexible stream processing capabilities.
194
195
```java { .api }
196
/**
197
* Base interface for process functions
198
*/
199
interface ProcessFunction {}
200
201
/**
202
* Single input stream processing function
203
* @param <IN> Input type
204
* @param <OUT> Output type
205
*/
206
interface OneInputStreamProcessFunction<IN, OUT> extends ProcessFunction {
207
/**
208
* Process single element
209
* @param element Input element
210
* @param output Output collector
211
* @param ctx Runtime context
212
* @throws Exception
213
*/
214
void processElement(IN element, Collector<OUT> output, RuntimeContext ctx) throws Exception;
215
216
/**
217
* Process timer event
218
* @param timestamp Timer timestamp
219
* @param output Output collector
220
* @param ctx Runtime context
221
* @throws Exception
222
*/
223
default void onTimer(long timestamp, Collector<OUT> output, RuntimeContext ctx) throws Exception {}
224
}
225
226
/**
227
* Key-aware process function for keyed streams
228
* @param <K> Key type
229
* @param <IN> Input type
230
* @param <OUT> Output type
231
*/
232
interface KeyedProcessFunction<K, IN, OUT> extends ProcessFunction {
233
/**
234
* Process element with key context
235
* @param element Input element
236
* @param output Output collector
237
* @param ctx Partitioned context with key access
238
* @throws Exception
239
*/
240
void processElement(IN element, Collector<OUT> output, PartitionedContext<K> ctx) throws Exception;
241
242
/**
243
* Process timer with key context
244
* @param timestamp Timer timestamp
245
* @param output Output collector
246
* @param ctx Partitioned context with key access
247
* @throws Exception
248
*/
249
default void onTimer(long timestamp, Collector<OUT> output, PartitionedContext<K> ctx) throws Exception {}
250
}
251
252
/**
253
* Dual input stream processing function (non-broadcast)
254
* @param <IN1> First input type
255
* @param <IN2> Second input type
256
* @param <OUT> Output type
257
*/
258
interface TwoInputNonBroadcastStreamProcessFunction<IN1, IN2, OUT> extends ProcessFunction {
259
/**
260
* Process element from first input
261
* @param element Element from first input
262
* @param output Output collector
263
* @param ctx Runtime context
264
* @throws Exception
265
*/
266
void processElement1(IN1 element, Collector<OUT> output, RuntimeContext ctx) throws Exception;
267
268
/**
269
* Process element from second input
270
* @param element Element from second input
271
* @param output Output collector
272
* @param ctx Runtime context
273
* @throws Exception
274
*/
275
void processElement2(IN2 element, Collector<OUT> output, RuntimeContext ctx) throws Exception;
276
}
277
278
/**
279
* Multi-output stream processing function
280
* @param <IN> Input type
281
* @param <OUT1> First output type
282
* @param <OUT2> Second output type
283
*/
284
interface TwoOutputStreamProcessFunction<IN, OUT1, OUT2> extends ProcessFunction {
285
/**
286
* Process element producing multiple outputs
287
* @param element Input element
288
* @param output1 First output collector
289
* @param output2 Second output collector
290
* @param ctx Runtime context
291
* @throws Exception
292
*/
293
void processElement(IN element, Collector<OUT1> output1, Collector<OUT2> output2, RuntimeContext ctx) throws Exception;
294
}
295
```
296
297
### Context Interfaces
298
299
Context interfaces providing access to runtime information and services in the v2 API.
300
301
```java { .api }
302
/**
303
* Runtime context for accessing runtime information
304
*/
305
interface RuntimeContext {
306
/**
307
* Get task name
308
* @return Task name
309
*/
310
String getTaskName();
311
312
/**
313
* Get parallelism of current operator
314
* @return Parallelism
315
*/
316
int getParallelism();
317
318
/**
319
* Get index of current parallel subtask
320
* @return Subtask index
321
*/
322
int getIndexOfThisSubtask();
323
324
/**
325
* Get processing time manager
326
* @return Processing time manager
327
*/
328
ProcessingTimeManager getProcessingTimeManager();
329
330
/**
331
* Get state manager
332
* @return State manager
333
*/
334
StateManager getStateManager();
335
}
336
337
/**
338
* Context for partitioned (keyed) processing
339
* @param <K> Key type
340
*/
341
interface PartitionedContext<K> extends RuntimeContext {
342
/**
343
* Get current key
344
* @return Current processing key
345
*/
346
K getCurrentKey();
347
348
/**
349
* Get keyed state manager
350
* @return Keyed state manager
351
*/
352
KeyedStateManager getKeyedStateManager();
353
}
354
355
/**
356
* Interface for state management
357
*/
358
interface StateManager {
359
/**
360
* Get value state
361
* @param descriptor State descriptor
362
* @param <T> Value type
363
* @return Value state
364
*/
365
<T> ValueState<T> getState(ValueStateDescriptor<T> descriptor);
366
367
/**
368
* Get list state
369
* @param descriptor State descriptor
370
* @param <T> Element type
371
* @return List state
372
*/
373
<T> ListState<T> getListState(ListStateDescriptor<T> descriptor);
374
375
/**
376
* Get map state
377
* @param descriptor State descriptor
378
* @param <UK> User key type
379
* @param <UV> User value type
380
* @return Map state
381
*/
382
<UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> descriptor);
383
}
384
385
/**
386
* Interface for keyed state management
387
*/
388
interface KeyedStateManager extends StateManager {
389
/**
390
* Get reducing state
391
* @param descriptor State descriptor
392
* @param <T> Element type
393
* @return Reducing state
394
*/
395
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> descriptor);
396
397
/**
398
* Get aggregating state
399
* @param descriptor State descriptor
400
* @param <IN> Input type
401
* @param <ACC> Accumulator type
402
* @param <OUT> Output type
403
* @return Aggregating state
404
*/
405
<IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> descriptor);
406
}
407
408
/**
409
* Interface for processing time management
410
*/
411
interface ProcessingTimeManager {
412
/**
413
* Get current processing time
414
* @return Current processing time timestamp
415
*/
416
long getCurrentProcessingTime();
417
418
/**
419
* Register processing time timer
420
* @param timestamp Timer timestamp
421
*/
422
void registerTimer(long timestamp);
423
424
/**
425
* Delete processing time timer
426
* @param timestamp Timer timestamp
427
*/
428
void deleteTimer(long timestamp);
429
}
430
```
431
432
### Windowing Extension
433
434
Windowing support for the v2 API through extensions.
435
436
```java { .api }
437
/**
438
* Base window strategy interface
439
* @param <W> Window type
440
*/
441
interface WindowStrategy<W> {
442
/**
443
* Assign windows to element
444
* @param element Element to assign windows to
445
* @param timestamp Element timestamp
446
* @return Collection of windows
447
*/
448
Collection<W> assignWindows(Object element, long timestamp);
449
450
/**
451
* Get window serializer
452
* @return Window serializer
453
*/
454
TypeSerializer<W> getWindowSerializer();
455
}
456
457
/**
458
* Tumbling time window strategy
459
*/
460
class TumblingTimeWindowStrategy implements WindowStrategy<TimeWindow> {
461
/**
462
* Create tumbling time windows with specified size
463
* @param size Window size
464
* @return Tumbling window strategy
465
*/
466
public static TumblingTimeWindowStrategy of(Duration size);
467
468
/**
469
* Create tumbling time windows with size and offset
470
* @param size Window size
471
* @param offset Window offset
472
* @return Tumbling window strategy
473
*/
474
public static TumblingTimeWindowStrategy of(Duration size, Duration offset);
475
}
476
477
/**
478
* Sliding time window strategy
479
*/
480
class SlidingTimeWindowStrategy implements WindowStrategy<TimeWindow> {
481
/**
482
* Create sliding time windows with size and slide
483
* @param size Window size
484
* @param slide Slide interval
485
* @return Sliding window strategy
486
*/
487
public static SlidingTimeWindowStrategy of(Duration size, Duration slide);
488
489
/**
490
* Create sliding time windows with size, slide, and offset
491
* @param size Window size
492
* @param slide Slide interval
493
* @param offset Window offset
494
* @return Sliding window strategy
495
*/
496
public static SlidingTimeWindowStrategy of(Duration size, Duration slide, Duration offset);
497
}
498
499
/**
500
* Session window strategy
501
*/
502
class SessionWindowStrategy implements WindowStrategy<TimeWindow> {
503
/**
504
* Create session windows with inactivity gap
505
* @param gap Inactivity gap
506
* @return Session window strategy
507
*/
508
public static SessionWindowStrategy withGap(Duration gap);
509
}
510
511
/**
512
* Time window implementation
513
*/
514
class TimeWindow {
515
/**
516
* Get window start time
517
* @return Start timestamp
518
*/
519
public long getStart();
520
521
/**
522
* Get window end time
523
* @return End timestamp
524
*/
525
public long getEnd();
526
527
/**
528
* Get window maximum timestamp
529
* @return Maximum timestamp
530
*/
531
public long maxTimestamp();
532
533
/**
534
* Check if window contains timestamp
535
* @param timestamp Timestamp to check
536
* @return true if timestamp is in window
537
*/
538
public boolean contains(long timestamp);
539
}
540
```
541
542
### Event Time Extension
543
544
Event time processing support through extensions.
545
546
```java { .api }
547
/**
548
* Event time extension for DataStream API v2
549
*/
550
class EventTimeExtension {
551
/**
552
* Enable event time processing for stream
553
* @param stream Input stream
554
* @param <T> Element type
555
* @return Stream with event time support
556
*/
557
public static <T> DataStream<T> withEventTime(DataStream<T> stream);
558
559
/**
560
* Assign timestamps and watermarks
561
* @param stream Input stream
562
* @param timestampAssigner Timestamp assigner
563
* @param <T> Element type
564
* @return Stream with timestamps and watermarks
565
*/
566
public static <T> DataStream<T> assignTimestampsAndWatermarks(
567
DataStream<T> stream,
568
TimestampAssigner<T> timestampAssigner
569
);
570
}
571
572
/**
573
* Interface for event time management
574
*/
575
interface EventTimeManager {
576
/**
577
* Get current event time
578
* @return Current event time
579
*/
580
long getCurrentEventTime();
581
582
/**
583
* Register event time timer
584
* @param timestamp Timer timestamp
585
*/
586
void registerTimer(long timestamp);
587
588
/**
589
* Delete event time timer
590
* @param timestamp Timer timestamp
591
*/
592
void deleteTimer(long timestamp);
593
}
594
595
/**
596
* Interface for assigning timestamps to elements
597
* @param <T> Element type
598
*/
599
interface TimestampAssigner<T> {
600
/**
601
* Extract timestamp from element
602
* @param element Element
603
* @param recordTimestamp Record timestamp
604
* @return Element timestamp
605
*/
606
long extractTimestamp(T element, long recordTimestamp);
607
}
608
```