0
# Code Generation Framework
1
2
Framework for generating optimized runtime code including aggregation functions, join conditions, projections, and other operations for maximum performance through compile-time optimization.
3
4
## Capabilities
5
6
### Generated Class Framework
7
8
Base infrastructure for all generated classes, providing the foundation for compile-time code generation and runtime class loading.
9
10
```java { .api }
11
/**
12
* Base class for all generated classes
13
* Provides foundation for compile-time code generation and runtime class loading
14
*/
15
abstract class GeneratedClass<T> {
16
/** Get the class name of the generated class */
17
String getClassName();
18
19
/** Get the generated Java source code */
20
String getCode();
21
22
/** Get references to external objects used in generated code */
23
Object[] getReferences();
24
25
/** Compile and instantiate the generated class */
26
T newInstance(ClassLoader classLoader) throws Exception;
27
}
28
```
29
30
### Generated Function Interfaces
31
32
Core interfaces that generated functions must implement, defining the contract for various types of generated operations.
33
34
```java { .api }
35
/**
36
* Aggregation function interface for generated code
37
* Handles accumulation and retrieval of aggregated values
38
*/
39
interface AggsHandleFunction {
40
/** Set aggregation accumulator */
41
void setAccumulators(RowData acc) throws Exception;
42
43
/** Get current accumulator values */
44
RowData getAccumulators() throws Exception;
45
46
/** Accumulate input row */
47
void accumulate(RowData input) throws Exception;
48
49
/** Retract input row (for retracting streams) */
50
void retract(RowData input) throws Exception;
51
52
/** Merge accumulators from different partitions */
53
void merge(RowData otherAcc) throws Exception;
54
55
/** Reset accumulator to initial state */
56
void resetAccumulator() throws Exception;
57
58
/** Get final aggregation result */
59
RowData getValue() throws Exception;
60
61
/** Clean up expired state */
62
void cleanup() throws Exception;
63
64
/** Close and release resources */
65
void close() throws Exception;
66
}
67
68
/**
69
* Table aggregation function interface for generated code
70
* Handles table-valued aggregation functions (UDTAF)
71
*/
72
interface TableAggsHandleFunction {
73
/** Set table aggregation accumulator */
74
void setAccumulators(RowData acc) throws Exception;
75
76
/** Get current accumulator values */
77
RowData getAccumulators() throws Exception;
78
79
/** Accumulate input row */
80
void accumulate(RowData input) throws Exception;
81
82
/** Retract input row */
83
void retract(RowData input) throws Exception;
84
85
/** Emit aggregation results */
86
void emitValue(RowData acc, Collector<RowData> out) throws Exception;
87
88
/** Clean up expired state */
89
void cleanup() throws Exception;
90
91
/** Close and release resources */
92
void close() throws Exception;
93
}
94
95
/**
96
* Join condition interface for generated code
97
* Evaluates join predicates between left and right input rows
98
*/
99
interface JoinCondition {
100
/** Evaluate join condition */
101
boolean apply(RowData left, RowData right) throws Exception;
102
103
/** Close and release resources */
104
void close() throws Exception;
105
}
106
107
/**
108
* Projection interface for generated code
109
* Projects input rows to output rows with transformed fields
110
*/
111
interface Projection<IN, OUT> {
112
/** Apply projection to input row */
113
OUT apply(IN input) throws Exception;
114
115
/** Close and release resources */
116
void close() throws Exception;
117
}
118
119
/**
120
* Record comparator interface for generated code
121
* Compares two records for sorting and ranking operations
122
*/
123
interface RecordComparator {
124
/** Compare two records */
125
int compare(RowData o1, RowData o2);
126
127
/** Close and release resources */
128
void close() throws Exception;
129
}
130
131
/**
132
* Record equalizer interface for generated code
133
* Checks equality between two records
134
*/
135
interface RecordEqualiser {
136
/** Check if two records are equal */
137
boolean equals(RowData left, RowData right);
138
139
/** Close and release resources */
140
void close() throws Exception;
141
}
142
```
143
144
### Generated Class Wrappers
145
146
Wrapper classes that encapsulate generated code for different types of operations, providing type safety and lifecycle management.
147
148
```java { .api }
149
/** Wrapper for generated aggregation functions */
150
class GeneratedAggsHandleFunction extends GeneratedClass<AggsHandleFunction> {
151
GeneratedAggsHandleFunction(String className, String code, Object[] references);
152
153
/** Create new aggregation function instance */
154
AggsHandleFunction newInstance(ClassLoader classLoader) throws Exception;
155
}
156
157
/** Wrapper for generated table aggregation functions */
158
class GeneratedTableAggsHandleFunction extends GeneratedClass<TableAggsHandleFunction> {
159
GeneratedTableAggsHandleFunction(String className, String code, Object[] references);
160
161
/** Create new table aggregation function instance */
162
TableAggsHandleFunction newInstance(ClassLoader classLoader) throws Exception;
163
}
164
165
/** Wrapper for generated join conditions */
166
class GeneratedJoinCondition extends GeneratedClass<JoinCondition> {
167
GeneratedJoinCondition(String className, String code, Object[] references);
168
169
/** Create new join condition instance */
170
JoinCondition newInstance(ClassLoader classLoader) throws Exception;
171
}
172
173
/** Wrapper for generated projections */
174
class GeneratedProjection<IN, OUT> extends GeneratedClass<Projection<IN, OUT>> {
175
GeneratedProjection(String className, String code, Object[] references);
176
177
/** Create new projection instance */
178
Projection<IN, OUT> newInstance(ClassLoader classLoader) throws Exception;
179
}
180
181
/** Wrapper for generated record comparators */
182
class GeneratedRecordComparator extends GeneratedClass<RecordComparator> {
183
GeneratedRecordComparator(String className, String code, Object[] references);
184
185
/** Create new record comparator instance */
186
RecordComparator newInstance(ClassLoader classLoader) throws Exception;
187
}
188
189
/** Wrapper for generated record equalizers */
190
class GeneratedRecordEqualiser extends GeneratedClass<RecordEqualiser> {
191
GeneratedRecordEqualiser(String className, String code, Object[] references);
192
193
/** Create new record equalizer instance */
194
RecordEqualiser newInstance(ClassLoader classLoader) throws Exception;
195
}
196
```
197
198
### Advanced Generated Functions
199
200
Specialized generated function interfaces for complex operations like window functions and hash functions.
201
202
```java { .api }
203
/**
204
* Window function interface for generated code
205
* Processes elements within window boundaries
206
*/
207
interface WindowFunction<IN, OUT, KEY, W extends Window> {
208
/** Apply window function to all elements in window */
209
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
210
211
/** Close and release resources */
212
void close() throws Exception;
213
}
214
215
/**
216
* Hash function interface for generated code
217
* Computes hash codes for partitioning and bucketing
218
*/
219
interface HashFunction {
220
/** Compute hash code for input row */
221
int hashCode(RowData input);
222
223
/** Close and release resources */
224
void close() throws Exception;
225
}
226
227
/**
228
* Filter function interface for generated code
229
* Filters rows based on generated predicates
230
*/
231
interface FilterFunction {
232
/** Evaluate filter condition */
233
boolean filter(RowData input) throws Exception;
234
235
/** Close and release resources */
236
void close() throws Exception;
237
}
238
```
239
240
### Generated Wrapper Classes for Advanced Functions
241
242
Wrapper classes for advanced generated functions providing type safety and lifecycle management.
243
244
```java { .api }
245
/** Wrapper for generated window functions */
246
class GeneratedWindowFunction<IN, OUT, KEY, W extends Window>
247
extends GeneratedClass<WindowFunction<IN, OUT, KEY, W>> {
248
249
GeneratedWindowFunction(String className, String code, Object[] references);
250
}
251
252
/** Wrapper for generated hash functions */
253
class GeneratedHashFunction extends GeneratedClass<HashFunction> {
254
GeneratedHashFunction(String className, String code, Object[] references);
255
}
256
257
/** Wrapper for generated filter functions */
258
class GeneratedFilterFunction extends GeneratedClass<FilterFunction> {
259
GeneratedFilterFunction(String className, String code, Object[] references);
260
}
261
262
/** Wrapper for generated result future (async operations) */
263
class GeneratedResultFuture<T> extends GeneratedClass<ResultFuture<T>> {
264
GeneratedResultFuture(String className, String code, Object[] references);
265
}
266
```
267
268
### Code Generation Utilities
269
270
Utility classes and interfaces for supporting the code generation process, including namespace management and function creation.
271
272
```java { .api }
273
/**
274
* Namespace for generated functions
275
* Manages references and context for generated code execution
276
*/
277
interface FunctionContext {
278
/** Get reference by index */
279
<T> T getReference(int index);
280
281
/** Get metric group for monitoring */
282
MetricGroup getMetricGroup();
283
284
/** Get job parameters */
285
Configuration getJobParameters();
286
287
/** Get user class loader */
288
ClassLoader getUserCodeClassLoader();
289
}
290
291
/**
292
* Generated function with context
293
* Base interface for generated functions that need runtime context
294
*/
295
interface GeneratedFunction<T> {
296
/** Open function with context */
297
void open(FunctionContext context) throws Exception;
298
299
/** Get the actual function instance */
300
T getInstance();
301
302
/** Close and cleanup */
303
void close() throws Exception;
304
}
305
```
306
307
### Specialized Generated Operations
308
309
Generated classes for specialized operations like key selectors, partition computers, and serializers.
310
311
```java { .api }
312
/** Wrapper for generated key selectors */
313
class GeneratedKeySelector<IN, KEY> extends GeneratedClass<KeySelector<IN, KEY>> {
314
GeneratedKeySelector(String className, String code, Object[] references);
315
}
316
317
/** Wrapper for generated partition computers */
318
class GeneratedPartitionComputer<T> extends GeneratedClass<PartitionComputer<T>> {
319
GeneratedPartitionComputer(String className, String code, Object[] references);
320
}
321
322
/** Wrapper for generated serializers */
323
class GeneratedSerializer<T> extends GeneratedClass<TypeSerializer<T>> {
324
GeneratedSerializer(String className, String code, Object[] references);
325
}
326
327
/** Wrapper for generated watermark generators */
328
class GeneratedWatermarkGenerator extends GeneratedClass<WatermarkGenerator<RowData>> {
329
GeneratedWatermarkGenerator(String className, String code, Object[] references);
330
}
331
```
332
333
### Code Generation Context
334
335
Context and configuration classes for managing the code generation process, including compilation settings and optimization parameters.
336
337
```java { .api }
338
/**
339
* Configuration for code generation
340
* Controls compilation settings and optimization parameters
341
*/
342
class CodeGeneratorContext {
343
/** Add reference object */
344
String addReusableReference(Object obj, String className);
345
346
/** Add reusable local variable */
347
String addReusableLocalVariable(String variableName, String initialValue);
348
349
/** Add reusable function */
350
String addReusableFunction(String function, String functionName);
351
352
/** Get generated references */
353
Object[] getReferences();
354
355
/** Set null check enabled */
356
void setNullCheckEnabled(boolean enabled);
357
358
/** Set operator context */
359
void setOperatorContext(StreamingRuntimeContext context);
360
}
361
362
/**
363
* Code generation utilities
364
* Provides helper methods for generating optimized code
365
*/
366
class CodeGenUtils {
367
/** Generate null-safe field access code */
368
static String genNullSafeFieldAccess(String input, int index, LogicalType type);
369
370
/** Generate type conversion code */
371
static String genTypeConversion(String input, LogicalType fromType, LogicalType toType);
372
373
/** Generate comparison code */
374
static String genComparison(String left, String right, LogicalType type);
375
376
/** Generate hash code computation */
377
static String genHashCode(String input, LogicalType type);
378
}
379
```
380
381
## Usage Examples
382
383
```java
384
// Create generated aggregation function
385
GeneratedAggsHandleFunction genAggFunction = new GeneratedAggsHandleFunction(
386
"MyAggFunction",
387
generatedCode,
388
references
389
);
390
391
// Instantiate the generated function
392
AggsHandleFunction aggFunction = genAggFunction.newInstance(classLoader);
393
394
// Use in aggregation operator
395
GroupAggFunction groupAggOperator = new GroupAggFunction(
396
genAggFunction,
397
accTypes,
398
indexOfCountStar,
399
generateUpdateBefore,
400
needRetract
401
);
402
403
// Create generated join condition
404
GeneratedJoinCondition genJoinCondition = new GeneratedJoinCondition(
405
"MyJoinCondition",
406
joinConditionCode,
407
joinReferences
408
);
409
410
// Use in join operator
411
SortMergeJoinOperator joinOperator = new SortMergeJoinOperator(
412
FlinkJoinType.INNER,
413
genJoinCondition,
414
leftComparator,
415
rightComparator
416
);
417
418
// Create generated projection
419
GeneratedProjection<RowData, RowData> genProjection = new GeneratedProjection<>(
420
"MyProjection",
421
projectionCode,
422
projectionReferences
423
);
424
425
// Use projection
426
Projection<RowData, RowData> projection = genProjection.newInstance(classLoader);
427
RowData output = projection.apply(input);
428
429
// Create code generation context
430
CodeGeneratorContext ctx = new CodeGeneratorContext();
431
ctx.setNullCheckEnabled(true);
432
ctx.addReusableReference(myObject, "MyClass");
433
434
// Generate optimized code with context
435
String optimizedCode = generateOptimizedFunction(ctx, expression);
436
```