or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdcode-generation.mdexpression-system.mdfunction-integration.mdindex.mdplanner-factory.mdquery-planning.md

code-generation.mddocs/

0

# Code Generation

1

2

The code generation system provides dynamic Java code generation for high-performance query execution. It generates specialized code for calculations, aggregations, projections, and other operations using the Janino compiler to achieve optimal runtime performance by eliminating interpretation overhead.

3

4

## Capabilities

5

6

### CalcCodeGenerator - Calculation Code Generation

7

8

Generates optimized code for general calculations including expressions, filters, and projections.

9

10

```scala { .api }

11

/**

12

* Code generator for calculation operations and expressions

13

*/

14

object CalcCodeGenerator {

15

16

/**

17

* Generates a calc operator for executing calculations and projections

18

* @param ctx Code generation context

19

* @param inputTransform Input transformation providing the data

20

* @param outputType Output row type information

21

* @param projection Sequence of RexNodes for projection

22

* @param condition Optional condition for filtering

23

* @param retainHeader Whether to retain row header information

24

* @param opName Name for the generated operator

25

* @return Generated operator factory for calculation operations

26

*/

27

def generateCalcOperator(

28

ctx: CodeGeneratorContext,

29

inputTransform: Transformation[RowData],

30

outputType: RowType,

31

projection: Seq[RexNode],

32

condition: Option[RexNode],

33

retainHeader: Boolean = false,

34

opName: String

35

): CodeGenOperatorFactory[RowData]

36

37

/**

38

* Generates internal function for calculation with custom parameters (private API)

39

* @param inputType Input row type

40

* @param name Function name

41

* @param returnType Return row type

42

* @param outRowClass Output row data class

43

* @param calcProjection Calculation projection nodes

44

* @param calcCondition Optional calculation condition

45

* @param config Table configuration

46

* @return Generated function for flat map operations

47

*/

48

private[flink] def generateFunction[T <: Function](

49

inputType: RowType,

50

name: String,

51

returnType: RowType,

52

outRowClass: Class[_ <: RowData],

53

calcProjection: Seq[RexNode],

54

calcCondition: Option[RexNode],

55

config: TableConfig

56

): GeneratedFunction[FlatMapFunction[RowData, RowData]]

57

}

58

```

59

60

**Usage Example:**

61

62

```scala

63

import org.apache.flink.table.planner.codegen.CalcCodeGenerator

64

import org.apache.calcite.rex.RexProgram

65

66

// Generate code for a calculation (typically called internally by planner)

67

val rexProgram: RexProgram = // ... created during optimization

68

val inputType: RowType = // ... input schema

69

val outputType: RowType = // ... output schema

70

71

val generatedFunction = CalcCodeGenerator.generateFunction(

72

rexProgram, inputType, outputType, tableConfig, classLoader

73

)

74

75

// Generated function can be used in Flink operators

76

val operator = new ProcessFunction[RowData, RowData] {

77

val calc = generatedFunction.newInstance(classLoader)

78

// ... use calc.apply() for processing

79

}

80

```

81

82

### ProjectionCodeGenerator - Projection Code Generation

83

84

Generates specialized code for column projections and field access operations.

85

86

```scala { .api }

87

/**

88

* Code generator for projection operations

89

*/

90

object ProjectionCodeGenerator {

91

92

/**

93

* Generates projection code for accessing specific fields

94

* @param ctx Code generation context

95

* @param name Name for the generated projection

96

* @param inType Input row type information

97

* @param outType Output row type information

98

* @param inputMapping Array of field indices to project

99

* @param outClass Output row data class

100

* @param inputTerm Input term name for code generation

101

* @param outRecordTerm Output record term name

102

* @param outRecordWriterTerm Output record writer term name

103

* @param reusedOutRecord Whether to reuse output record instances

104

* @return Generated projection function

105

*/

106

def generateProjection(

107

ctx: CodeGeneratorContext,

108

name: String,

109

inType: RowType,

110

outType: RowType,

111

inputMapping: Array[Int],

112

outClass: Class[_ <: RowData] = classOf[BinaryRowData],

113

inputTerm: String = DEFAULT_INPUT1_TERM,

114

outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,

115

outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM,

116

reusedOutRecord: Boolean = true

117

): GeneratedProjection

118

119

/**

120

* Generates projection expression for field transformation

121

* @param ctx Code generation context

122

* @param inType Input row type information

123

* @param outType Output row type information

124

* @param inputMapping Array of field indices to project

125

* @param outClass Output row data class

126

* @param inputTerm Input term name for code generation

127

* @param outRecordTerm Output record term name

128

* @param outRecordWriterTerm Output record writer term name

129

* @param reusedOutRecord Whether to reuse output record instances

130

* @return Generated expression for projection

131

*/

132

def generateProjectionExpression(

133

ctx: CodeGeneratorContext,

134

inType: RowType,

135

outType: RowType,

136

inputMapping: Array[Int],

137

outClass: Class[_ <: RowData] = classOf[BinaryRowData],

138

inputTerm: String = DEFAULT_INPUT1_TERM,

139

outRecordTerm: String = DEFAULT_OUT_RECORD_TERM,

140

outRecordWriterTerm: String = DEFAULT_OUT_RECORD_WRITER_TERM,

141

reusedOutRecord: Boolean = true

142

): GeneratedExpression

143

}

144

145

/**

146

* Generated projection function interface

147

*/

148

trait GeneratedProjection {

149

/**

150

* Applies projection to input row data

151

* @param input Input row data

152

* @return Projected row data

153

*/

154

def apply(input: RowData): RowData

155

}

156

```

157

158

### EqualiserCodeGenerator - Equality Comparison Code Generation

159

160

Generates efficient code for equality comparisons and key extraction operations.

161

162

```scala { .api }

163

/**

164

* Code generator for equality comparisons and key operations

165

*/

166

object EqualiserCodeGenerator {

167

168

/**

169

* Generates equaliser for comparing row data instances

170

* @param fieldTypes Types of fields to compare

171

* @param config Table configuration

172

* @param classLoader Class loader for generated code

173

* @return Generated equaliser function

174

*/

175

def generateRecordEqualiser(

176

fieldTypes: Array[DataType],

177

config: TableConfig,

178

classLoader: ClassLoader

179

): GeneratedRecordEqualiser

180

181

/**

182

* Generates key equaliser for join and aggregation operations

183

* @param keyTypes Types of key fields

184

* @param nullsAreEqual Whether null values should be considered equal

185

* @return Generated key equaliser

186

*/

187

def generateKeyEqualiser(

188

keyTypes: Array[DataType],

189

nullsAreEqual: Boolean

190

): GeneratedEqualiser

191

}

192

193

/**

194

* Generated equaliser interface for row comparisons

195

*/

196

trait GeneratedRecordEqualiser {

197

/**

198

* Tests equality between two row data instances

199

* @param left First row to compare

200

* @param right Second row to compare

201

* @return True if rows are equal, false otherwise

202

*/

203

def equals(left: RowData, right: RowData): Boolean

204

}

205

```

206

207

### Aggregation Code Generation

208

209

Specialized code generation for aggregation operations including hash-based and sort-based aggregations.

210

211

```scala { .api }

212

/**

213

* Code generator for aggregation handlers

214

*/

215

object AggsHandlerCodeGenerator {

216

217

/**

218

* Generates aggregation handler for processing aggregation functions

219

* @param aggInfos Information about aggregation functions

220

* @param inputType Input row type

221

* @param grouping Grouping specification

222

* @param config Table configuration

223

* @return Generated aggregation handler

224

*/

225

def generateAggsHandler(

226

aggInfos: Array[AggregateInfo],

227

inputType: RowType,

228

grouping: Array[Int],

229

config: TableConfig

230

): GeneratedAggsHandler

231

}

232

233

/**

234

* Hash-based aggregation code generator

235

*/

236

object HashAggCodeGenerator {

237

238

/**

239

* Generates hash aggregation operator

240

* @param aggInfos Aggregation function information

241

* @param inputType Input data type

242

* @param outputType Output data type

243

* @param grouping Grouping key specification

244

* @return Generated hash aggregation operator

245

*/

246

def generate(

247

aggInfos: Array[AggregateInfo],

248

inputType: RowType,

249

outputType: RowType,

250

grouping: Array[Int]

251

): GeneratedOperator[OneInputStreamOperator[RowData, RowData]]

252

}

253

254

/**

255

* Sort-based aggregation code generator

256

*/

257

object SortAggCodeGenerator {

258

259

/**

260

* Generates sort aggregation operator

261

* @param aggInfos Aggregation function information

262

* @param inputType Input data type

263

* @param outputType Output data type

264

* @param grouping Grouping specification

265

* @param orderKeys Sort order specification

266

* @return Generated sort aggregation operator

267

*/

268

def generate(

269

aggInfos: Array[AggregateInfo],

270

inputType: RowType,

271

outputType: RowType,

272

grouping: Array[Int],

273

orderKeys: Array[Int]

274

): GeneratedOperator[OneInputStreamOperator[RowData, RowData]]

275

}

276

```

277

278

### WatermarkGeneratorCodeGen - Watermark Code Generation

279

280

Generates code for watermark generation in streaming scenarios.

281

282

```scala { .api }

283

/**

284

* Code generator for watermark generation

285

*/

286

object WatermarkGeneratorCodeGen {

287

288

/**

289

* Generates watermark generator for streaming sources

290

* @param rowtimeFieldIndex Index of rowtime field

291

* @param watermarkExpr Watermark generation expression

292

* @param inputType Input row type

293

* @param config Table configuration

294

* @return Generated watermark generator

295

*/

296

def generateWatermarkGenerator(

297

rowtimeFieldIndex: Int,

298

watermarkExpr: RexNode,

299

inputType: RowType,

300

config: TableConfig

301

): GeneratedWatermarkGenerator

302

}

303

304

/**

305

* Generated watermark generator interface

306

*/

307

trait GeneratedWatermarkGenerator {

308

/**

309

* Generates watermark for given row data

310

* @param row Input row containing timestamp

311

* @return Generated watermark timestamp

312

*/

313

def currentWatermark(row: RowData): Long

314

}

315

```

316

317

### HashCodeGenerator - Hash Code Generation

318

319

Generates efficient hash code computation for keys and records.

320

321

```scala { .api }

322

/**

323

* Code generator for hash code computation

324

*/

325

object HashCodeGenerator {

326

327

/**

328

* Generates hash code computer for row data

329

* @param fieldTypes Types of fields to hash

330

* @param config Table configuration

331

* @return Generated hash code computer

332

*/

333

def generateRowHash(

334

fieldTypes: Array[DataType],

335

config: TableConfig

336

): GeneratedHashFunction

337

338

/**

339

* Generates hash code computer for key fields

340

* @param keyTypes Types of key fields

341

* @param keyIndices Indices of key fields in the row

342

* @return Generated key hash function

343

*/

344

def generateKeyHash(

345

keyTypes: Array[DataType],

346

keyIndices: Array[Int]

347

): GeneratedHashFunction

348

}

349

350

/**

351

* Generated hash function interface

352

*/

353

trait GeneratedHashFunction {

354

/**

355

* Computes hash code for row data

356

* @param row Row data to hash

357

* @return Hash code value

358

*/

359

def hashCode(row: RowData): Int

360

}

361

```

362

363

### Over Window Code Generation

364

365

Specialized code generation for over window operations including range and row-based windows.

366

367

```scala { .api }

368

/**

369

* Code generator for range-based window comparisons

370

*/

371

object RangeBoundComparatorCodeGenerator {

372

373

/**

374

* Generates range bound comparator for window operations

375

* @param boundType Type of the bound field

376

* @param isLowerBound Whether this is a lower bound comparator

377

* @param config Table configuration

378

* @return Generated range bound comparator

379

*/

380

def generate(

381

boundType: DataType,

382

isLowerBound: Boolean,

383

config: TableConfig

384

): GeneratedRecordComparator

385

}

386

387

/**

388

* Multi-field range bound comparator generator

389

*/

390

object MultiFieldRangeBoundComparatorCodeGenerator {

391

392

/**

393

* Generates comparator for multi-field range bounds

394

* @param orderKeys Array of order key information

395

* @param boundTypes Types of bound fields

396

* @param orders Sort orders for each field

397

* @return Generated multi-field comparator

398

*/

399

def generate(

400

orderKeys: Array[Int],

401

boundTypes: Array[DataType],

402

orders: Array[SortDirection]

403

): GeneratedRecordComparator

404

}

405

```

406

407

## Generated Code Interface

408

409

### GeneratedFunction

410

411

Base interface for all generated functions:

412

413

```scala { .api }

414

/**

415

* Base interface for generated functions

416

*/

417

trait GeneratedFunction[F, T] {

418

/**

419

* Creates new instance of the generated function

420

* @param classLoader Class loader for instantiation

421

* @return New function instance

422

*/

423

def newInstance(classLoader: ClassLoader): F

424

425

/**

426

* Returns generated code as string (for debugging)

427

* @return Generated Java code

428

*/

429

def getCode: String

430

431

/**

432

* Returns class name of generated function

433

* @return Generated class name

434

*/

435

def getClassName: String

436

}

437

```

438

439

### GeneratedOperator

440

441

Interface for generated Flink operators:

442

443

```scala { .api }

444

/**

445

* Generated Flink operator interface

446

*/

447

trait GeneratedOperator[T <: StreamOperator[_]] {

448

/**

449

* Creates new instance of generated operator

450

* @param parameters Operator parameters

451

* @return New operator instance

452

*/

453

def newInstance(parameters: Map[String, Any]): T

454

455

/**

456

* Returns generated operator code

457

* @return Generated Java code

458

*/

459

def getCode: String

460

}

461

```

462

463

## Code Generation Configuration

464

465

Key configuration options for code generation:

466

467

```java

468

// Enable/disable code generation

469

tableConfig.getConfiguration().setString("table.exec.codegen.enabled", "true");

470

471

// Set maximum generated code length

472

tableConfig.getConfiguration().setString("table.exec.codegen.length.max", "64000");

473

474

// Enable null check elimination optimization

475

tableConfig.getConfiguration().setString("table.exec.codegen.null-check", "true");

476

477

// Configure string concatenation method

478

tableConfig.getConfiguration().setString("table.exec.codegen.string.concat", "true");

479

```

480

481

The generated code is compiled using the Janino compiler at runtime and provides significant performance improvements over interpreted execution by eliminating virtual method calls and enabling JIT compiler optimizations.