0
# Runtime Operators
1
2
Comprehensive set of operators for joins, aggregations, window operations, sorting, ranking, and other table processing operations optimized for both streaming and batch execution modes.
3
4
## Capabilities
5
6
### Operator Factories
7
8
Main factories for creating runtime operators, including code-generated operators and specialized operator types.
9
10
```java { .api }
11
/**
12
* Main factory for code-generated operators
13
* Creates optimized operators from generated code for maximum performance
14
*/
15
class CodeGenOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OUT> {
16
/** Create factory with generated operator class */
17
CodeGenOperatorFactory(GeneratedClass<? extends StreamOperator<OUT>> operatorCodeGenerator);
18
19
/** Create the actual operator instance */
20
<T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters);
21
22
/** Get operator class */
23
Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader);
24
}
25
26
/** Factory for watermark assignment operators */
27
class WatermarkAssignerOperatorFactory extends AbstractStreamOperatorFactory<RowData> {
28
WatermarkAssignerOperatorFactory(
29
int rowtimeFieldIndex,
30
WatermarkStrategy<RowData> watermarkStrategy
31
);
32
}
33
34
/** Factory for multi-input operators in batch mode */
35
class BatchMultipleInputStreamOperatorFactory<OUT>
36
extends AbstractStreamOperatorFactory<OUT> {
37
38
BatchMultipleInputStreamOperatorFactory(
39
GeneratedClass<? extends MultipleInputStreamOperator<OUT>> generatedClass
40
);
41
}
42
```
43
44
### Window Operations
45
46
Comprehensive window processing capabilities including builders for different window types and window assigners.
47
48
```java { .api }
49
/**
50
* Builder class for window operators
51
* Provides fluent API for constructing window processing operators
52
*/
53
class WindowOperatorBuilder {
54
/** Create new builder instance */
55
static WindowOperatorBuilder builder();
56
57
/** Configure tumbling window */
58
WindowOperatorBuilder tumble(Duration size);
59
60
/** Configure sliding window */
61
WindowOperatorBuilder sliding(Duration size, Duration slide);
62
63
/** Configure event time processing */
64
WindowOperatorBuilder withEventTime(int rowtimeIndex);
65
66
/** Configure processing time */
67
WindowOperatorBuilder withProcessingTime();
68
69
/** Set window assigner */
70
WindowOperatorBuilder withWindowAssigner(WindowAssigner<?, ? extends Window> assigner);
71
72
/** Set window function */
73
WindowOperatorBuilder withWindowFunction(WindowFunction<?, ?, ?, ?> function);
74
75
/** Set trigger */
76
WindowOperatorBuilder withTrigger(Trigger<?, ?> trigger);
77
78
/** Set evictor */
79
WindowOperatorBuilder withEvictor(Evictor<?, ?> evictor);
80
81
/** Set allowed lateness */
82
WindowOperatorBuilder withAllowedLateness(Time allowedLateness);
83
84
/** Build the window operator */
85
OneInputStreamOperator<RowData, RowData> build();
86
}
87
88
/**
89
* Builder for slicing window aggregation operators
90
* Optimized for non-overlapping window aggregations
91
*/
92
class SlicingWindowAggOperatorBuilder {
93
SlicingWindowAggOperatorBuilder(
94
GeneratedAggsHandleFunction aggsHandleFunction,
95
WindowAssigner<?, ?> windowAssigner,
96
Trigger<?, ?> trigger
97
);
98
99
/** Set state backend */
100
SlicingWindowAggOperatorBuilder withStateBackend(StateBackend stateBackend);
101
102
/** Build the operator */
103
OneInputStreamOperator<RowData, RowData> build();
104
}
105
106
/**
107
* Builder for window-based ranking operators
108
* Handles ranking within window boundaries
109
*/
110
class WindowRankOperatorBuilder {
111
WindowRankOperatorBuilder(
112
WindowAssigner<?, ?> windowAssigner,
113
GeneratedRecordComparator comparator,
114
RankType rankType,
115
long rankStart,
116
long rankEnd
117
);
118
119
/** Build the window rank operator */
120
OneInputStreamOperator<RowData, RowData> build();
121
}
122
```
123
124
### Window Assigners
125
126
Different types of window assigners for various windowing strategies including tumbling, sliding, and session windows.
127
128
```java { .api }
129
/** Tumbling window assigner */
130
class TumblingWindowAssigner extends WindowAssigner<Object, TimeWindow> {
131
TumblingWindowAssigner(long windowSize, long offset);
132
133
/** Assign windows to element */
134
Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context);
135
}
136
137
/** Sliding window assigner */
138
class SlidingWindowAssigner extends WindowAssigner<Object, TimeWindow> {
139
SlidingWindowAssigner(long windowSize, long slide, long offset);
140
141
/** Assign windows to element */
142
Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context);
143
}
144
145
/** Session window assigner */
146
class SessionWindowAssigner extends WindowAssigner<Object, TimeWindow> {
147
SessionWindowAssigner(long sessionTimeout);
148
149
/** Assign windows to element */
150
Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context);
151
}
152
```
153
154
### Window Types
155
156
Different window implementations for time-based and count-based windowing operations.
157
158
```java { .api }
159
/** Time-based window implementation */
160
class TimeWindow extends Window {
161
/** Window start timestamp */
162
public final long start;
163
164
/** Window end timestamp */
165
public final long end;
166
167
/** Create time window */
168
TimeWindow(long start, long end);
169
170
/** Get window size */
171
public long getSize();
172
173
/** Check if window covers timestamp */
174
public boolean covers(long timestamp);
175
}
176
177
/** Count-based window implementation */
178
class CountWindow extends Window {
179
/** Window ID */
180
public final long id;
181
182
/** Create count window */
183
CountWindow(long id);
184
}
185
```
186
187
### Join Operations
188
189
Comprehensive join processing capabilities including different join types, join strategies, and specialized join operators.
190
191
```java { .api }
192
/** Join type enumeration */
193
enum FlinkJoinType {
194
INNER, LEFT, RIGHT, FULL, SEMI, ANTI;
195
196
/** Check if join type is outer */
197
boolean isOuter();
198
199
/** Check if join type filters nulls */
200
boolean filtersNulls();
201
}
202
203
/** Hash join type enumeration */
204
enum HashJoinType {
205
INNER, BUILD_LEFT, BUILD_RIGHT, FULL_OUTER, SEMI, ANTI;
206
207
/** Get corresponding Flink join type */
208
FlinkJoinType toFlinkJoinType();
209
}
210
211
/** Sort-merge join operator implementation */
212
class SortMergeJoinOperator extends AbstractStreamOperator<RowData>
213
implements TwoInputStreamOperator<RowData, RowData, RowData> {
214
215
SortMergeJoinOperator(
216
FlinkJoinType joinType,
217
GeneratedJoinCondition joinCondition,
218
GeneratedRecordComparator leftComparator,
219
GeneratedRecordComparator rightComparator
220
);
221
222
/** Process left input */
223
void processElement1(StreamRecord<RowData> element) throws Exception;
224
225
/** Process right input */
226
void processElement2(StreamRecord<RowData> element) throws Exception;
227
}
228
229
/** Streaming join operator */
230
class StreamingJoinOperator extends AbstractStreamOperator<RowData>
231
implements TwoInputStreamOperator<RowData, RowData, RowData> {
232
233
StreamingJoinOperator(
234
FlinkJoinType joinType,
235
GeneratedJoinCondition joinCondition,
236
long leftLowerBound,
237
long leftUpperBound,
238
long rightLowerBound,
239
long rightUpperBound
240
);
241
}
242
243
/** Streaming semi/anti join operator */
244
class StreamingSemiAntiJoinOperator extends AbstractStreamOperator<RowData>
245
implements TwoInputStreamOperator<RowData, RowData, RowData> {
246
247
StreamingSemiAntiJoinOperator(
248
boolean isAntiJoin,
249
GeneratedJoinCondition joinCondition,
250
long minRetentionTime,
251
long maxRetentionTime
252
);
253
}
254
```
255
256
### Lookup Joins
257
258
Specialized join operators for lookup operations against external systems and dimension tables.
259
260
```java { .api }
261
/** Async lookup join runner */
262
class AsyncLookupJoinRunner {
263
AsyncLookupJoinRunner(
264
GeneratedFunction<AsyncTableFunction<Object>> generatedFetcher,
265
DataStructureConverter<RowData, Object> fetcherConverter,
266
DataStructureConverter<Object, RowData> lookupResultConverter,
267
GeneratedResultFuture<Object> generatedResultFuture,
268
boolean isLeftOuterJoin
269
);
270
271
/** Process input row with async lookup */
272
void processElement(RowData input, Collector<RowData> out) throws Exception;
273
}
274
275
/** Sync lookup join runner */
276
class LookupJoinRunner {
277
LookupJoinRunner(
278
GeneratedFunction<TableFunction<Object>> generatedFetcher,
279
DataStructureConverter<RowData, Object> fetcherConverter,
280
DataStructureConverter<Object, RowData> lookupResultConverter,
281
boolean isLeftOuterJoin
282
);
283
284
/** Process input row with sync lookup */
285
void processElement(RowData input, Collector<RowData> out) throws Exception;
286
}
287
288
/** Lookup join with calc runner */
289
class LookupJoinWithCalcRunner {
290
LookupJoinWithCalcRunner(
291
GeneratedFunction<TableFunction<Object>> generatedFetcher,
292
DataStructureConverter<RowData, Object> fetcherConverter,
293
GeneratedProjection projection,
294
boolean isLeftOuterJoin
295
);
296
}
297
```
298
299
### Aggregation Operations
300
301
Aggregation operators for group-by operations, including support for mini-batch processing and table aggregations.
302
303
```java { .api }
304
/** Group aggregation function */
305
class GroupAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> {
306
GroupAggFunction(
307
GeneratedAggsHandleFunction genAggsHandler,
308
LogicalType[] accTypes,
309
int indexOfCountStar,
310
boolean generateUpdateBefore,
311
boolean needRetract
312
);
313
314
/** Process element in group aggregation */
315
void processElement(RowData value, Context ctx, Collector<RowData> out) throws Exception;
316
}
317
318
/** Group table aggregation function */
319
class GroupTableAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> {
320
GroupTableAggFunction(
321
GeneratedTableAggsHandleFunction genAggsHandler,
322
LogicalType[] accTypes,
323
int indexOfCountStar,
324
boolean generateUpdateBefore
325
);
326
}
327
328
/** Mini-batch global group aggregation function */
329
class MiniBatchGlobalGroupAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> {
330
MiniBatchGlobalGroupAggFunction(
331
GeneratedAggsHandleFunction genAggsHandler,
332
LogicalType[] accTypes,
333
int indexOfCountStar,
334
boolean generateUpdateBefore,
335
long miniBatchSize
336
);
337
}
338
339
/** Mini-batch group aggregation function */
340
class MiniBatchGroupAggFunction extends KeyedProcessFunction<RowData, RowData, RowData> {
341
MiniBatchGroupAggFunction(
342
GeneratedAggsHandleFunction genAggsHandler,
343
LogicalType[] accTypes,
344
int indexOfCountStar,
345
boolean generateUpdateBefore,
346
long miniBatchSize
347
);
348
}
349
```
350
351
### Sorting Operations
352
353
Operators for sorting data including regular sort, sort with limit, and limit-only operations.
354
355
```java { .api }
356
/** Sort operator implementation */
357
class SortOperator extends AbstractStreamOperator<RowData>
358
implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
359
360
SortOperator(GeneratedRecordComparator gComparator);
361
362
/** Process input element */
363
void processElement(StreamRecord<RowData> element) throws Exception;
364
365
/** End input processing and emit sorted results */
366
void endInput() throws Exception;
367
}
368
369
/** Sort with limit operator */
370
class SortLimitOperator extends AbstractStreamOperator<RowData>
371
implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
372
373
SortLimitOperator(
374
boolean isGlobal,
375
long limitStart,
376
long limitEnd,
377
GeneratedRecordComparator gComparator
378
);
379
}
380
381
/** Limit operator implementation */
382
class LimitOperator extends AbstractStreamOperator<RowData>
383
implements OneInputStreamOperator<RowData, RowData> {
384
385
LimitOperator(boolean isGlobal, long limitStart, long limitEnd);
386
387
/** Process input element with limit */
388
void processElement(StreamRecord<RowData> element) throws Exception;
389
}
390
```
391
392
### Ranking Operations
393
394
Operators for ranking and Top-N operations including different ranking strategies and buffer management.
395
396
```java { .api }
397
/** Ranking operator implementation */
398
class RankOperator extends KeyedProcessFunction<RowData, RowData, RowData> {
399
RankOperator(
400
GeneratedRecordComparator sortKeyComparator,
401
RankType rankType,
402
long rankStart,
403
long rankEnd,
404
boolean generateUpdateBefore,
405
boolean outputRankNumber
406
);
407
408
/** Process element for ranking */
409
void processElement(RowData value, Context ctx, Collector<RowData> out) throws Exception;
410
}
411
412
/** Buffer for top-N operations */
413
class TopNBuffer {
414
TopNBuffer(
415
GeneratedRecordComparator sortKeyComparator,
416
ArrayList<RowData> buffer,
417
RankType rankType,
418
long rankStart,
419
long rankEnd
420
);
421
422
/** Put record into buffer */
423
List<RowData> put(RowData record);
424
425
/** Remove record from buffer */
426
List<RowData> remove(RowData record);
427
428
/** Get current rankings */
429
Iterator<Map.Entry<RowData, Long>> getIterator();
430
}
431
432
/** Append-only top-N function */
433
class AppendOnlyTopNFunction extends KeyedProcessFunction<RowData, RowData, RowData> {
434
AppendOnlyTopNFunction(
435
GeneratedRecordComparator sortKeyComparator,
436
RankType rankType,
437
long rankStart,
438
long rankEnd,
439
boolean generateUpdateBefore,
440
boolean outputRankNumber
441
);
442
}
443
444
/** Retractable top-N function */
445
class RetractableTopNFunction extends KeyedProcessFunction<RowData, RowData, RowData> {
446
RetractableTopNFunction(
447
GeneratedRecordComparator sortKeyComparator,
448
RankType rankType,
449
long rankStart,
450
long rankEnd,
451
boolean generateUpdateBefore,
452
boolean outputRankNumber
453
);
454
}
455
```
456
457
### Hash Table Operations
458
459
Hash table implementations for join and aggregation operations, optimized for different data types and use cases.
460
461
```java { .api }
462
/** Hash table for long keys */
463
class LongHashTable {
464
LongHashTable(int capacity, double loadFactor);
465
466
/** Put key-value pair */
467
boolean put(long key, RowData value);
468
469
/** Get value by key */
470
RowData get(long key);
471
472
/** Remove key-value pair */
473
RowData remove(long key);
474
475
/** Check if contains key */
476
boolean contains(long key);
477
478
/** Get current size */
479
int size();
480
}
481
482
/** Binary hash table implementation */
483
class BinaryHashTable {
484
BinaryHashTable(
485
MemoryManager memManager,
486
long memorySize,
487
LogicalType[] keyTypes,
488
LogicalType[] valueTypes
489
);
490
491
/** Open the hash table */
492
void open() throws IOException;
493
494
/** Put binary row */
495
boolean put(BinaryRowData key, BinaryRowData value) throws IOException;
496
497
/** Get binary row by key */
498
BinaryRowData get(BinaryRowData key) throws IOException;
499
500
/** Close and clean up */
501
void close();
502
}
503
```
504
505
### Deduplication Operations
506
507
Operators for removing duplicate records in both processing time and event time scenarios.
508
509
```java { .api }
510
/** Base class for processing time deduplication functions */
511
abstract class ProcTimeDeduplicateFunctionBase extends KeyedProcessFunction<RowData, RowData, RowData> {
512
ProcTimeDeduplicateFunctionBase(
513
long minRetentionTime,
514
long maxRetentionTime,
515
boolean generateUpdateBefore,
516
boolean keepLastRow
517
);
518
519
/** Process element for deduplication */
520
void processElement(RowData value, Context ctx, Collector<RowData> out) throws Exception;
521
}
522
523
/** Processing time mini-batch deduplicate function for keeping first row */
524
class ProcTimeMiniBatchDeduplicateKeepFirstRowFunction
525
extends ProcTimeDeduplicateFunctionBase {
526
527
ProcTimeMiniBatchDeduplicateKeepFirstRowFunction(
528
long minRetentionTime,
529
long maxRetentionTime,
530
boolean generateUpdateBefore,
531
long miniBatchSize
532
);
533
}
534
```
535
536
## Usage Examples
537
538
```java
539
// Create window operator using builder
540
WindowOperatorBuilder builder = new WindowOperatorBuilder()
541
.withWindowAssigner(new TumblingWindowAssigner(Duration.ofMinutes(5).toMillis(), 0))
542
.withWindowFunction(new MyWindowFunction())
543
.withTrigger(EventTimeTrigger.create())
544
.withAllowedLateness(Time.seconds(10));
545
546
OneInputStreamOperator<RowData, RowData> windowOperator = builder.build();
547
548
// Create join operator
549
SortMergeJoinOperator joinOperator = new SortMergeJoinOperator(
550
FlinkJoinType.INNER,
551
generatedJoinCondition,
552
leftComparator,
553
rightComparator
554
);
555
556
// Create aggregation function
557
GroupAggFunction aggFunction = new GroupAggFunction(
558
generatedAggsHandler,
559
accTypes,
560
indexOfCountStar,
561
true, // generateUpdateBefore
562
false // needRetract
563
);
564
565
// Create ranking operator
566
RankOperator rankOperator = new RankOperator(
567
sortKeyComparator,
568
RankType.ROW_NUMBER,
569
1L, // rankStart
570
10L, // rankEnd
571
true, // generateUpdateBefore
572
true // outputRankNumber
573
);
574
```