0
# Configuration and Builders
1
2
Configuration classes provide sophisticated control over Calcite behavior and planner settings. The configuration system allows customization of SQL parsing, query optimization, and operator handling through a builder pattern API.
3
4
## Package Information
5
6
```scala
7
import org.apache.flink.table.planner.calcite.{CalciteConfig, CalciteConfigBuilder}
8
import org.apache.flink.table.planner.delegation.PlannerConfiguration
9
import org.apache.calcite.sql.parser.SqlParser
10
import org.apache.calcite.sql.SqlOperatorTable
11
import org.apache.calcite.config.CalciteConnectionConfig
12
```
13
14
```java
15
import org.apache.flink.table.planner.calcite.CalciteConfig;
16
import org.apache.flink.table.planner.delegation.PlannerConfiguration;
17
import org.apache.flink.configuration.ReadableConfig;
18
import org.apache.flink.configuration.Configuration;
19
```
20
21
## Capabilities
22
23
### CalciteConfig
24
25
Core configuration trait for customizing Apache Calcite behavior within the Flink planner.
26
27
```scala { .api }
28
trait CalciteConfig {
29
30
// SQL Parser Configuration
31
def getSqlParserConfig: Option[SqlParser.Config]
32
33
// SQL Operator Tables
34
def getSqlOperatorTable: Option[SqlOperatorTable]
35
36
// Optimization Programs
37
def getBatchProgram: Option[FlinkChainedProgram[BatchOptimizeContext]]
38
def getStreamProgram: Option[FlinkChainedProgram[StreamOptimizeContext]]
39
40
// Connection Configuration
41
def getSqlToRelConverterConfig: Option[SqlToRelConverter.Config]
42
def getCalciteConnectionConfig: Option[CalciteConnectionConfig]
43
}
44
```
45
46
The `CalciteConfig` trait provides access to all major customization points in the Calcite integration:
47
48
- **Parser Configuration**: Customize SQL dialect, keywords, and parsing behavior
49
- **Operator Tables**: Add custom functions and operators beyond built-in ones
50
- **Optimization Programs**: Define custom optimization rule sequences for batch and streaming
51
- **Converter Configuration**: Control SQL-to-relational algebra conversion settings
52
53
### CalciteConfig Builder
54
55
Factory methods and builder for creating `CalciteConfig` instances.
56
57
```scala { .api }
58
object CalciteConfig {
59
def createBuilder(): CalciteConfigBuilder
60
def createBuilder(calciteConfig: CalciteConfig): CalciteConfigBuilder
61
}
62
63
class CalciteConfigBuilder {
64
65
// SQL Parser Customization
66
def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder
67
68
// SQL Operator Table Customization
69
def replaceSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
70
def addSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder
71
72
// Optimization Program Customization
73
def replaceBatchProgram(program: FlinkChainedProgram[BatchOptimizeContext]): CalciteConfigBuilder
74
def replaceStreamProgram(program: FlinkChainedProgram[StreamOptimizeContext]): CalciteConfigBuilder
75
76
// Converter Configuration
77
def replaceSqlToRelConverterConfig(config: SqlToRelConverter.Config): CalciteConfigBuilder
78
def replaceCalciteConnectionConfig(config: CalciteConnectionConfig): CalciteConfigBuilder
79
80
// Build final configuration
81
def build(): CalciteConfig
82
}
83
```
84
85
**Key Builder Methods:**
86
87
- **`replaceSqlParserConfig(...)`**: Completely replaces the default parser configuration
88
- **`addSqlOperatorTable(...)`**: Adds custom operators to the built-in operator table
89
- **`replaceSqlOperatorTable(...)`**: Completely replaces the built-in operator table
90
- **`replaceBatchProgram(...)`**: Replaces the default batch optimization program
91
- **`replaceStreamProgram(...)`**: Replaces the default stream optimization program
92
93
**Usage Example:**
94
95
```scala
96
import org.apache.flink.table.planner.calcite.CalciteConfig
97
import org.apache.calcite.sql.parser.SqlParser
98
import org.apache.calcite.sql.fun.SqlStdOperatorTable
99
100
// Create custom parser configuration
101
val parserConfig = SqlParser.Config.DEFAULT
102
.withLex(Lex.MYSQL)
103
.withIdentifierMaxLength(256)
104
105
// Create custom operator table
106
val customOperatorTable = // your custom operator table
107
108
// Build CalciteConfig with customizations
109
val calciteConfig = CalciteConfig.createBuilder()
110
.replaceSqlParserConfig(parserConfig)
111
.addSqlOperatorTable(customOperatorTable)
112
.replaceBatchProgram(myCustomBatchProgram)
113
.build()
114
115
// Use with table environment
116
val tableConfig = new TableConfig()
117
tableConfig.setPlannerConfig(calciteConfig)
118
```
119
120
### PlannerConfiguration
121
122
Unified configuration access for the planner module, implementing Flink's configuration system.
123
124
```java { .api }
125
public final class PlannerConfiguration implements ReadableConfig {
126
127
// Constructor
128
public PlannerConfiguration(
129
Configuration configuration,
130
ClassLoader classLoader,
131
ModuleManager moduleManager,
132
CatalogManager catalogManager,
133
FunctionCatalog functionCatalog
134
);
135
136
// Configuration access methods
137
public <T> T get(ConfigOption<T> option);
138
public <T> Optional<T> getOptional(ConfigOption<T> option);
139
public Configuration getConfiguration();
140
141
// Component access methods
142
public ClassLoader getClassLoader();
143
public ModuleManager getModuleManager();
144
public CatalogManager getCatalogManager();
145
public FunctionCatalog getFunctionCatalog();
146
}
147
```
148
149
**Key Features:**
150
- **Unified Access**: Single point of access for all planner configuration
151
- **Type Safety**: Strongly typed configuration options through `ConfigOption<T>`
152
- **Component Integration**: Direct access to catalog, modules, and function registry
153
- **Immutable Design**: Configuration objects are immutable after creation
154
155
**Usage Example:**
156
157
```java
158
import org.apache.flink.table.planner.delegation.PlannerConfiguration;
159
import org.apache.flink.configuration.Configuration;
160
import org.apache.flink.table.api.config.TableConfigOptions;
161
162
// Create configuration
163
Configuration config = new Configuration();
164
config.setString(TableConfigOptions.TABLE_SQL_DIALECT, "hive");
165
166
// Create planner configuration
167
PlannerConfiguration plannerConfig = new PlannerConfiguration(
168
config,
169
classLoader,
170
moduleManager,
171
catalogManager,
172
functionCatalog
173
);
174
175
// Access configuration values
176
String sqlDialect = plannerConfig.get(TableConfigOptions.TABLE_SQL_DIALECT);
177
Optional<Duration> idleTimeout = plannerConfig.getOptional(TableConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
178
179
// Access components
180
CatalogManager catalogManager = plannerConfig.getCatalogManager();
181
FunctionCatalog functionCatalog = plannerConfig.getFunctionCatalog();
182
```
183
184
## Configuration Examples
185
186
### Custom SQL Parser Configuration
187
188
```scala
189
import org.apache.calcite.sql.parser.SqlParser
190
import org.apache.calcite.avatica.util.Casing
191
import org.apache.calcite.sql.parser.SqlParser.Config
192
193
// Configure parser for different SQL dialects
194
val mysqlParserConfig = SqlParser.Config.DEFAULT
195
.withLex(Lex.MYSQL)
196
.withUnquotedCasing(Casing.UNCHANGED)
197
.withQuotedCasing(Casing.UNCHANGED)
198
.withCaseSensitive(false)
199
200
val calciteConfig = CalciteConfig.createBuilder()
201
.replaceSqlParserConfig(mysqlParserConfig)
202
.build()
203
```
204
205
### Custom Function Registration
206
207
```scala
208
import org.apache.calcite.sql.SqlOperatorTable
209
import org.apache.calcite.sql.fun.SqlStdOperatorTable
210
import org.apache.calcite.sql.util.ChainedSqlOperatorTable
211
212
// Create custom operator table with additional functions
213
val customOperatorTable = ChainedSqlOperatorTable.of(
214
SqlStdOperatorTable.instance(),
215
myCustomFunctions
216
)
217
218
val calciteConfig = CalciteConfig.createBuilder()
219
.addSqlOperatorTable(customOperatorTable)
220
.build()
221
```
222
223
### Custom Optimization Programs
224
225
```scala
226
import org.apache.flink.table.planner.plan.optimize.program._
227
228
// Create custom optimization program for batch processing
229
val customBatchProgram = FlinkBatchProgram.buildProgram(
230
// Add custom optimization rules
231
Seq(
232
FlinkBatchProgram.OPTIMIZE_REWRITE,
233
FlinkBatchProgram.OPTIMIZE_JOIN_REORDER,
234
myCustomOptimizationRules
235
)
236
)
237
238
val calciteConfig = CalciteConfig.createBuilder()
239
.replaceBatchProgram(customBatchProgram)
240
.build()
241
```
242
243
## Integration with TableConfig
244
245
The configuration integrates with Flink's `TableConfig` for global table environment settings:
246
247
```java
248
import org.apache.flink.table.api.TableConfig;
249
import org.apache.flink.table.planner.calcite.CalciteConfig;
250
251
// Set planner configuration on table config
252
TableConfig tableConfig = new TableConfig();
253
CalciteConfig calciteConfig = CalciteConfig.createBuilder()
254
.replaceSqlParserConfig(customParserConfig)
255
.build();
256
257
tableConfig.setPlannerConfig(calciteConfig);
258
259
// Use with table environment
260
TableEnvironment tableEnv = TableEnvironment.create(
261
EnvironmentSettings.newInstance()
262
.withConfiguration(tableConfig.getConfiguration())
263
.build()
264
);
265
```
266
267
## Configuration Best Practices
268
269
### Immutable Configuration Pattern
270
271
```scala
272
// Build configuration once and reuse
273
val baseConfig = CalciteConfig.createBuilder()
274
.replaceSqlParserConfig(standardParserConfig)
275
.build()
276
277
// Create variations from base configuration
278
val batchConfig = CalciteConfig.createBuilder(baseConfig)
279
.replaceBatchProgram(optimizedBatchProgram)
280
.build()
281
282
val streamConfig = CalciteConfig.createBuilder(baseConfig)
283
.replaceStreamProgram(optimizedStreamProgram)
284
.build()
285
```
286
287
### Conditional Configuration
288
289
```scala
290
val configBuilder = CalciteConfig.createBuilder()
291
292
// Add configurations conditionally
293
if (enableCustomFunctions) {
294
configBuilder.addSqlOperatorTable(myCustomFunctions)
295
}
296
297
if (useOptimizedBatchRules) {
298
configBuilder.replaceBatchProgram(optimizedBatchProgram)
299
}
300
301
val calciteConfig = configBuilder.build()
302
```
303
304
### Environment-Specific Configuration
305
306
```java
307
// Development configuration
308
CalciteConfig devConfig = CalciteConfig.createBuilder()
309
.replaceSqlParserConfig(lenientParserConfig) // More permissive parsing
310
.build();
311
312
// Production configuration
313
CalciteConfig prodConfig = CalciteConfig.createBuilder()
314
.replaceSqlParserConfig(strictParserConfig) // Strict parsing
315
.replaceBatchProgram(highlyOptimizedProgram) // Aggressive optimization
316
.build();
317
318
// Use appropriate config based on environment
319
CalciteConfig config = isProduction ? prodConfig : devConfig;
320
```
321
322
## Configuration Validation
323
324
The configuration system includes validation to ensure consistency:
325
326
```java
327
try {
328
CalciteConfig config = CalciteConfig.createBuilder()
329
.replaceSqlParserConfig(parserConfig)
330
.build();
331
} catch (IllegalArgumentException e) {
332
// Handle invalid configuration
333
logger.error("Invalid parser configuration: " + e.getMessage());
334
}
335
```
336
337
Configuration validation covers:
338
- **Parser Compatibility**: Ensuring parser config matches expected SQL dialect
339
- **Operator Consistency**: Validating custom operators are compatible
340
- **Program Validity**: Checking optimization programs are well-formed
341
- **Resource Limits**: Ensuring configuration values are within acceptable ranges