0
# Code Generation
1
2
The code generation system provides dynamic Java code generation for high-performance query execution. It generates specialized code for calculations, aggregations, projections, and other operations using the Janino compiler to achieve optimal runtime performance by eliminating interpretation overhead.
3
4
## Capabilities
5
6
### CalcCodeGenerator - Calculation Code Generation
7
8
Generates optimized code for general calculations including expressions, filters, and projections.
9
10
```scala { .api }
11
/**
12
* Code generator for calculation operations and expressions
13
*/
14
object CalcCodeGenerator {
15
16
/**
17
* Generates a calc operator for executing calculations and projections
18
* @param ctx Code generation context
19
* @param inputTransform Input transformation providing the data
20
* @param outputType Output row type information
21
* @param projection Sequence of RexNodes for projection
22
* @param condition Optional condition for filtering
23
* @param retainHeader Whether to retain row header information
24
* @param opName Name for the generated operator
25
* @return Generated operator factory for calculation operations
26
*/
27
def generateCalcOperator(
28
ctx: CodeGeneratorContext,
29
inputTransform: Transformation[RowData],
30
outputType: RowType,
31
projection: Seq[RexNode],
32
condition: Option[RexNode],
33
retainHeader: Boolean = false,
34
opName: String
35
): CodeGenOperatorFactory[RowData]
36
37
/**
38
* Generates internal function for calculation with custom parameters (private API)
39
* @param inputType Input row type
40
* @param name Function name
41
* @param returnType Return row type
42
* @param outRowClass Output row data class
43
* @param calcProjection Calculation projection nodes
44
* @param calcCondition Optional calculation condition
45
* @param config Table configuration
46
* @return Generated function for flat map operations
47
*/
48
private[flink] def generateFunction[T <: Function](
49
inputType: RowType,
50
name: String,
51
returnType: RowType,
52
outRowClass: Class[_ <: RowData],
53
calcProjection: Seq[RexNode],
54
calcCondition: Option[RexNode],
55
config: TableConfig
56
): GeneratedFunction[FlatMapFunction[RowData, RowData]]
57
}
58
```
59
60
**Usage Example:**
61
62
```scala
63
import org.apache.flink.table.planner.codegen.CalcCodeGenerator
64
import org.apache.calcite.rex.RexProgram
65
66
// Generate code for a calculation (typically called internally by planner)
67
val rexProgram: RexProgram = // ... created during optimization
68
val inputType: RowType = // ... input schema
69
val outputType: RowType = // ... output schema
70
71
val generatedFunction = CalcCodeGenerator.generateFunction(
72
rexProgram, inputType, outputType, tableConfig, classLoader
73
)
74
75
// Generated function can be used in Flink operators
76
val operator = new ProcessFunction[RowData, RowData] {
77
val calc = generatedFunction.newInstance(classLoader)
78
// ... use calc.apply() for processing
79
}
80
```
81
82
### ProjectionCodeGenerator - Projection Code Generation
83
84
Generates specialized code for column projections and field access operations.
85
86
```scala { .api }
87
/**
88
* Code generator for projection operations
89
*/
90
object ProjectionCodeGenerator {
91
92
/**
93
* Generates projection code for accessing specific fields
94
* @param ctx Code generation context
95
* @param name Name for the generated projection
96
* @param inType Input row type information
97
* @param outType Output row type information
98
* @param inputMapping Array of field indices to project
99
* @param outClass Output row data class
100
* @param inputTerm Input term name for code generation
101
* @param outRecordTerm Output record term name
102
* @param outRecordWriterTerm Output record writer term name
103
* @param reusedOutRecord Whether to reuse output record instances
104
* @return Generated projection function
105
*/
106
def generateProjection(
107
ctx: CodeGeneratorContext,
108
name: String,
109
inType: RowType,
110
outType: RowType,
111
inputMapping: Array[Int],
112
outClass: Class[_ <: RowData] = classOf[BinaryRowData],
113
inputTerm: String = DEFAULT_INPUT1_TERM,
114
outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,
115
outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM,
116
reusedOutRecord: Boolean = true
117
): GeneratedProjection
118
119
/**
120
* Generates projection expression for field transformation
121
* @param ctx Code generation context
122
* @param inType Input row type information
123
* @param outType Output row type information
124
* @param inputMapping Array of field indices to project
125
* @param outClass Output row data class
126
* @param inputTerm Input term name for code generation
127
* @param outRecordTerm Output record term name
128
* @param outRecordWriterTerm Output record writer term name
129
* @param reusedOutRecord Whether to reuse output record instances
130
* @return Generated expression for projection
131
*/
132
def generateProjectionExpression(
133
ctx: CodeGeneratorContext,
134
inType: RowType,
135
outType: RowType,
136
inputMapping: Array[Int],
137
outClass: Class[_ <: RowData] = classOf[BinaryRowData],
138
inputTerm: String = DEFAULT_INPUT1_TERM,
139
outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,
140
outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM,
141
reusedOutRecord: Boolean = true
142
): GeneratedExpression
143
}
144
145
/**
146
* Generated projection function interface
147
*/
148
trait GeneratedProjection {
149
/**
150
* Applies projection to input row data
151
* @param input Input row data
152
* @return Projected row data
153
*/
154
def apply(input: RowData): RowData
155
}
156
```
157
158
### EqualiserCodeGenerator - Equality Comparison Code Generation
159
160
Generates efficient code for equality comparisons and key extraction operations.
161
162
```scala { .api }
163
/**
164
* Code generator for equality comparisons and key operations
165
*/
166
object EqualiserCodeGenerator {
167
168
/**
169
* Generates equaliser for comparing row data instances
170
* @param fieldTypes Types of fields to compare
171
* @param config Table configuration
172
* @param classLoader Class loader for generated code
173
* @return Generated equaliser function
174
*/
175
def generateRecordEqualiser(
176
fieldTypes: Array[DataType],
177
config: TableConfig,
178
classLoader: ClassLoader
179
): GeneratedRecordEqualiser
180
181
/**
182
* Generates key equaliser for join and aggregation operations
183
* @param keyTypes Types of key fields
184
* @param nullsAreEqual Whether null values should be considered equal
185
* @return Generated key equaliser
186
*/
187
def generateKeyEqualiser(
188
keyTypes: Array[DataType],
189
nullsAreEqual: Boolean
190
): GeneratedEqualiser
191
}
192
193
/**
194
* Generated equaliser interface for row comparisons
195
*/
196
trait GeneratedRecordEqualiser {
197
/**
198
* Tests equality between two row data instances
199
* @param left First row to compare
200
* @param right Second row to compare
201
* @return True if rows are equal, false otherwise
202
*/
203
def equals(left: RowData, right: RowData): Boolean
204
}
205
```
206
207
### Aggregation Code Generation
208
209
Specialized code generation for aggregation operations including hash-based and sort-based aggregations.
210
211
```scala { .api }
212
/**
213
* Code generator for aggregation handlers
214
*/
215
object AggsHandlerCodeGenerator {
216
217
/**
218
* Generates aggregation handler for processing aggregation functions
219
* @param aggInfos Information about aggregation functions
220
* @param inputType Input row type
221
* @param grouping Grouping specification
222
* @param config Table configuration
223
* @return Generated aggregation handler
224
*/
225
def generateAggsHandler(
226
aggInfos: Array[AggregateInfo],
227
inputType: RowType,
228
grouping: Array[Int],
229
config: TableConfig
230
): GeneratedAggsHandler
231
}
232
233
/**
234
* Hash-based aggregation code generator
235
*/
236
object HashAggCodeGenerator {
237
238
/**
239
* Generates hash aggregation operator
240
* @param aggInfos Aggregation function information
241
* @param inputType Input data type
242
* @param outputType Output data type
243
* @param grouping Grouping key specification
244
* @return Generated hash aggregation operator
245
*/
246
def generate(
247
aggInfos: Array[AggregateInfo],
248
inputType: RowType,
249
outputType: RowType,
250
grouping: Array[Int]
251
): GeneratedOperator[OneInputStreamOperator[RowData, RowData]]
252
}
253
254
/**
255
* Sort-based aggregation code generator
256
*/
257
object SortAggCodeGenerator {
258
259
/**
260
* Generates sort aggregation operator
261
* @param aggInfos Aggregation function information
262
* @param inputType Input data type
263
* @param outputType Output data type
264
* @param grouping Grouping specification
265
* @param orderKeys Sort order specification
266
* @return Generated sort aggregation operator
267
*/
268
def generate(
269
aggInfos: Array[AggregateInfo],
270
inputType: RowType,
271
outputType: RowType,
272
grouping: Array[Int],
273
orderKeys: Array[Int]
274
): GeneratedOperator[OneInputStreamOperator[RowData, RowData]]
275
}
276
```
277
278
### WatermarkGeneratorCodeGen - Watermark Code Generation
279
280
Generates code for watermark generation in streaming scenarios.
281
282
```scala { .api }
283
/**
284
* Code generator for watermark generation
285
*/
286
object WatermarkGeneratorCodeGen {
287
288
/**
289
* Generates watermark generator for streaming sources
290
* @param rowtimeFieldIndex Index of rowtime field
291
* @param watermarkExpr Watermark generation expression
292
* @param inputType Input row type
293
* @param config Table configuration
294
* @return Generated watermark generator
295
*/
296
def generateWatermarkGenerator(
297
rowtimeFieldIndex: Int,
298
watermarkExpr: RexNode,
299
inputType: RowType,
300
config: TableConfig
301
): GeneratedWatermarkGenerator
302
}
303
304
/**
305
* Generated watermark generator interface
306
*/
307
trait GeneratedWatermarkGenerator {
308
/**
309
* Generates watermark for given row data
310
* @param row Input row containing timestamp
311
* @return Generated watermark timestamp
312
*/
313
def currentWatermark(row: RowData): Long
314
}
315
```
316
317
### HashCodeGenerator - Hash Code Generation
318
319
Generates efficient hash code computation for keys and records.
320
321
```scala { .api }
322
/**
323
* Code generator for hash code computation
324
*/
325
object HashCodeGenerator {
326
327
/**
328
* Generates hash code computer for row data
329
* @param fieldTypes Types of fields to hash
330
* @param config Table configuration
331
* @return Generated hash code computer
332
*/
333
def generateRowHash(
334
fieldTypes: Array[DataType],
335
config: TableConfig
336
): GeneratedHashFunction
337
338
/**
339
* Generates hash code computer for key fields
340
* @param keyTypes Types of key fields
341
* @param keyIndices Indices of key fields in the row
342
* @return Generated key hash function
343
*/
344
def generateKeyHash(
345
keyTypes: Array[DataType],
346
keyIndices: Array[Int]
347
): GeneratedHashFunction
348
}
349
350
/**
351
* Generated hash function interface
352
*/
353
trait GeneratedHashFunction {
354
/**
355
* Computes hash code for row data
356
* @param row Row data to hash
357
* @return Hash code value
358
*/
359
def hashCode(row: RowData): Int
360
}
361
```
362
363
### Over Window Code Generation
364
365
Specialized code generation for over window operations including range and row-based windows.
366
367
```scala { .api }
368
/**
369
* Code generator for range-based window comparisons
370
*/
371
object RangeBoundComparatorCodeGenerator {
372
373
/**
374
* Generates range bound comparator for window operations
375
* @param boundType Type of the bound field
376
* @param isLowerBound Whether this is a lower bound comparator
377
* @param config Table configuration
378
* @return Generated range bound comparator
379
*/
380
def generate(
381
boundType: DataType,
382
isLowerBound: Boolean,
383
config: TableConfig
384
): GeneratedRecordComparator
385
}
386
387
/**
388
* Multi-field range bound comparator generator
389
*/
390
object MultiFieldRangeBoundComparatorCodeGenerator {
391
392
/**
393
* Generates comparator for multi-field range bounds
394
* @param orderKeys Array of order key information
395
* @param boundTypes Types of bound fields
396
* @param orders Sort orders for each field
397
* @return Generated multi-field comparator
398
*/
399
def generate(
400
orderKeys: Array[Int],
401
boundTypes: Array[DataType],
402
orders: Array[SortDirection]
403
): GeneratedRecordComparator
404
}
405
```
406
407
## Generated Code Interface
408
409
### GeneratedFunction
410
411
Base interface for all generated functions:
412
413
```scala { .api }
414
/**
415
* Base interface for generated functions
416
*/
417
trait GeneratedFunction[F, T] {
418
/**
419
* Creates new instance of the generated function
420
* @param classLoader Class loader for instantiation
421
* @return New function instance
422
*/
423
def newInstance(classLoader: ClassLoader): F
424
425
/**
426
* Returns generated code as string (for debugging)
427
* @return Generated Java code
428
*/
429
def getCode: String
430
431
/**
432
* Returns class name of generated function
433
* @return Generated class name
434
*/
435
def getClassName: String
436
}
437
```
438
439
### GeneratedOperator
440
441
Interface for generated Flink operators:
442
443
```scala { .api }
444
/**
445
* Generated Flink operator interface
446
*/
447
trait GeneratedOperator[T <: StreamOperator[_]] {
448
/**
449
* Creates new instance of generated operator
450
* @param parameters Operator parameters
451
* @return New operator instance
452
*/
453
def newInstance(parameters: Map[String, Any]): T
454
455
/**
456
* Returns generated operator code
457
* @return Generated Java code
458
*/
459
def getCode: String
460
}
461
```
462
463
## Code Generation Configuration
464
465
Key configuration options for code generation:
466
467
```java
468
// Enable/disable code generation
469
tableConfig.getConfiguration().setString("table.exec.codegen.enabled", "true");
470
471
// Set maximum generated code length
472
tableConfig.getConfiguration().setString("table.exec.codegen.length.max", "64000");
473
474
// Enable null check elimination optimization
475
tableConfig.getConfiguration().setString("table.exec.codegen.null-check", "true");
476
477
// Configure string concatenation method
478
tableConfig.getConfiguration().setString("table.exec.codegen.string.concat", "true");
479
```
480
481
The generated code is compiled using the Janino compiler at runtime and provides significant performance improvements over interpreted execution by eliminating virtual method calls and enabling JIT compiler optimizations.