or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdconnector-integration.mdcore-planning.mdenums-constants.mdexecution-nodes.mdfactory-classes.mdindex.mdtype-system.md

configuration.mddocs/

0

# Configuration and Builders

1

2

Configuration classes provide sophisticated control over Calcite behavior and planner settings. The configuration system allows customization of SQL parsing, query optimization, and operator handling through a builder pattern API.

3

4

## Package Information

5

6

```scala

7

import org.apache.flink.table.planner.calcite.{CalciteConfig, CalciteConfigBuilder}

8

import org.apache.flink.table.planner.delegation.PlannerConfiguration

9

import org.apache.calcite.sql.parser.SqlParser

10

import org.apache.calcite.sql.SqlOperatorTable

11

import org.apache.calcite.config.CalciteConnectionConfig

12

```

13

14

```java

15

import org.apache.flink.table.planner.calcite.CalciteConfig;

16

import org.apache.flink.table.planner.delegation.PlannerConfiguration;

17

import org.apache.flink.configuration.ReadableConfig;

18

import org.apache.flink.configuration.Configuration;

19

```

20

21

## Capabilities

22

23

### CalciteConfig

24

25

Core configuration trait for customizing Apache Calcite behavior within the Flink planner.

26

27

```scala { .api }

28

trait CalciteConfig {

29

30

// SQL Parser Configuration

31

def getSqlParserConfig: Option[SqlParser.Config]

32

33

// SQL Operator Tables

34

def getSqlOperatorTable: Option[SqlOperatorTable]

35

36

// Optimization Programs

37

def getBatchProgram: Option[FlinkChainedProgram[BatchOptimizeContext]]

38

def getStreamProgram: Option[FlinkChainedProgram[StreamOptimizeContext]]

39

40

// Connection Configuration

41

def getSqlToRelConverterConfig: Option[SqlToRelConverter.Config]

42

def getCalciteConnectionConfig: Option[CalciteConnectionConfig]

43

}

44

```

45

46

The `CalciteConfig` trait provides access to all major customization points in the Calcite integration:

47

48

- **Parser Configuration**: Customize SQL dialect, keywords, and parsing behavior

49

- **Operator Tables**: Add custom functions and operators beyond built-in ones

50

- **Optimization Programs**: Define custom optimization rule sequences for batch and streaming

51

- **Converter Configuration**: Control SQL-to-relational algebra conversion settings

52

53

### CalciteConfig Builder

54

55

Factory methods and builder for creating `CalciteConfig` instances.

56

57

```scala { .api }

58

object CalciteConfig {

59

def createBuilder(): CalciteConfigBuilder

60

def createBuilder(calciteConfig: CalciteConfig): CalciteConfigBuilder

61

}

62

63

class CalciteConfigBuilder {

64

65

// SQL Parser Customization

66

def replaceSqlParserConfig(sqlParserConfig: SqlParser.Config): CalciteConfigBuilder

67

68

// SQL Operator Table Customization

69

def replaceSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder

70

def addSqlOperatorTable(sqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder

71

72

// Optimization Program Customization

73

def replaceBatchProgram(program: FlinkChainedProgram[BatchOptimizeContext]): CalciteConfigBuilder

74

def replaceStreamProgram(program: FlinkChainedProgram[StreamOptimizeContext]): CalciteConfigBuilder

75

76

// Converter Configuration

77

def replaceSqlToRelConverterConfig(config: SqlToRelConverter.Config): CalciteConfigBuilder

78

def replaceCalciteConnectionConfig(config: CalciteConnectionConfig): CalciteConfigBuilder

79

80

// Build final configuration

81

def build(): CalciteConfig

82

}

83

```

84

85

**Key Builder Methods:**

86

87

- **`replaceSqlParserConfig(...)`**: Completely replaces the default parser configuration

88

- **`addSqlOperatorTable(...)`**: Adds custom operators to the built-in operator table

89

- **`replaceSqlOperatorTable(...)`**: Completely replaces the built-in operator table

90

- **`replaceBatchProgram(...)`**: Replaces the default batch optimization program

91

- **`replaceStreamProgram(...)`**: Replaces the default stream optimization program

92

93

**Usage Example:**

94

95

```scala

96

import org.apache.flink.table.planner.calcite.CalciteConfig

97

import org.apache.calcite.sql.parser.SqlParser

98

import org.apache.calcite.sql.fun.SqlStdOperatorTable

99

100

// Create custom parser configuration

101

val parserConfig = SqlParser.Config.DEFAULT

102

.withLex(Lex.MYSQL)

103

.withIdentifierMaxLength(256)

104

105

// Create custom operator table

106

val customOperatorTable = // your custom operator table

107

108

// Build CalciteConfig with customizations

109

val calciteConfig = CalciteConfig.createBuilder()

110

.replaceSqlParserConfig(parserConfig)

111

.addSqlOperatorTable(customOperatorTable)

112

.replaceBatchProgram(myCustomBatchProgram)

113

.build()

114

115

// Use with table environment

116

val tableConfig = new TableConfig()

117

tableConfig.setPlannerConfig(calciteConfig)

118

```

119

120

### PlannerConfiguration

121

122

Unified configuration access for the planner module, implementing Flink's configuration system.

123

124

```java { .api }

125

public final class PlannerConfiguration implements ReadableConfig {

126

127

// Constructor

128

public PlannerConfiguration(

129

Configuration configuration,

130

ClassLoader classLoader,

131

ModuleManager moduleManager,

132

CatalogManager catalogManager,

133

FunctionCatalog functionCatalog

134

);

135

136

// Configuration access methods

137

public <T> T get(ConfigOption<T> option);

138

public <T> Optional<T> getOptional(ConfigOption<T> option);

139

public Configuration getConfiguration();

140

141

// Component access methods

142

public ClassLoader getClassLoader();

143

public ModuleManager getModuleManager();

144

public CatalogManager getCatalogManager();

145

public FunctionCatalog getFunctionCatalog();

146

}

147

```

148

149

**Key Features:**

150

- **Unified Access**: Single point of access for all planner configuration

151

- **Type Safety**: Strongly typed configuration options through `ConfigOption<T>`

152

- **Component Integration**: Direct access to catalog, modules, and function registry

153

- **Immutable Design**: Configuration objects are immutable after creation

154

155

**Usage Example:**

156

157

```java

158

import org.apache.flink.table.planner.delegation.PlannerConfiguration;

159

import org.apache.flink.configuration.Configuration;

160

import org.apache.flink.table.api.config.TableConfigOptions;

161

162

// Create configuration

163

Configuration config = new Configuration();

164

config.setString(TableConfigOptions.TABLE_SQL_DIALECT, "hive");

165

166

// Create planner configuration

167

PlannerConfiguration plannerConfig = new PlannerConfiguration(

168

config,

169

classLoader,

170

moduleManager,

171

catalogManager,

172

functionCatalog

173

);

174

175

// Access configuration values

176

String sqlDialect = plannerConfig.get(TableConfigOptions.TABLE_SQL_DIALECT);

177

Optional<Duration> idleTimeout = plannerConfig.getOptional(TableConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);

178

179

// Access components

180

CatalogManager catalogManager = plannerConfig.getCatalogManager();

181

FunctionCatalog functionCatalog = plannerConfig.getFunctionCatalog();

182

```

183

184

## Configuration Examples

185

186

### Custom SQL Parser Configuration

187

188

```scala

189

import org.apache.calcite.sql.parser.SqlParser

190

import org.apache.calcite.avatica.util.Casing

191

import org.apache.calcite.sql.parser.SqlParser.Config

192

193

// Configure parser for different SQL dialects

194

val mysqlParserConfig = SqlParser.Config.DEFAULT

195

.withLex(Lex.MYSQL)

196

.withUnquotedCasing(Casing.UNCHANGED)

197

.withQuotedCasing(Casing.UNCHANGED)

198

.withCaseSensitive(false)

199

200

val calciteConfig = CalciteConfig.createBuilder()

201

.replaceSqlParserConfig(mysqlParserConfig)

202

.build()

203

```

204

205

### Custom Function Registration

206

207

```scala

208

import org.apache.calcite.sql.SqlOperatorTable

209

import org.apache.calcite.sql.fun.SqlStdOperatorTable

210

import org.apache.calcite.sql.util.ChainedSqlOperatorTable

211

212

// Create custom operator table with additional functions

213

val customOperatorTable = ChainedSqlOperatorTable.of(

214

SqlStdOperatorTable.instance(),

215

myCustomFunctions

216

)

217

218

val calciteConfig = CalciteConfig.createBuilder()

219

.addSqlOperatorTable(customOperatorTable)

220

.build()

221

```

222

223

### Custom Optimization Programs

224

225

```scala

226

import org.apache.flink.table.planner.plan.optimize.program._

227

228

// Create custom optimization program for batch processing

229

val customBatchProgram = FlinkBatchProgram.buildProgram(

230

// Add custom optimization rules

231

Seq(

232

FlinkBatchProgram.OPTIMIZE_REWRITE,

233

FlinkBatchProgram.OPTIMIZE_JOIN_REORDER,

234

myCustomOptimizationRules

235

)

236

)

237

238

val calciteConfig = CalciteConfig.createBuilder()

239

.replaceBatchProgram(customBatchProgram)

240

.build()

241

```

242

243

## Integration with TableConfig

244

245

The configuration integrates with Flink's `TableConfig` for global table environment settings:

246

247

```java

248

import org.apache.flink.table.api.TableConfig;

249

import org.apache.flink.table.planner.calcite.CalciteConfig;

250

251

// Set planner configuration on table config

252

TableConfig tableConfig = new TableConfig();

253

CalciteConfig calciteConfig = CalciteConfig.createBuilder()

254

.replaceSqlParserConfig(customParserConfig)

255

.build();

256

257

tableConfig.setPlannerConfig(calciteConfig);

258

259

// Use with table environment

260

TableEnvironment tableEnv = TableEnvironment.create(

261

EnvironmentSettings.newInstance()

262

.withConfiguration(tableConfig.getConfiguration())

263

.build()

264

);

265

```

266

267

## Configuration Best Practices

268

269

### Immutable Configuration Pattern

270

271

```scala

272

// Build configuration once and reuse

273

val baseConfig = CalciteConfig.createBuilder()

274

.replaceSqlParserConfig(standardParserConfig)

275

.build()

276

277

// Create variations from base configuration

278

val batchConfig = CalciteConfig.createBuilder(baseConfig)

279

.replaceBatchProgram(optimizedBatchProgram)

280

.build()

281

282

val streamConfig = CalciteConfig.createBuilder(baseConfig)

283

.replaceStreamProgram(optimizedStreamProgram)

284

.build()

285

```

286

287

### Conditional Configuration

288

289

```scala

290

val configBuilder = CalciteConfig.createBuilder()

291

292

// Add configurations conditionally

293

if (enableCustomFunctions) {

294

configBuilder.addSqlOperatorTable(myCustomFunctions)

295

}

296

297

if (useOptimizedBatchRules) {

298

configBuilder.replaceBatchProgram(optimizedBatchProgram)

299

}

300

301

val calciteConfig = configBuilder.build()

302

```

303

304

### Environment-Specific Configuration

305

306

```java

307

// Development configuration

308

CalciteConfig devConfig = CalciteConfig.createBuilder()

309

.replaceSqlParserConfig(lenientParserConfig) // More permissive parsing

310

.build();

311

312

// Production configuration

313

CalciteConfig prodConfig = CalciteConfig.createBuilder()

314

.replaceSqlParserConfig(strictParserConfig) // Strict parsing

315

.replaceBatchProgram(highlyOptimizedProgram) // Aggressive optimization

316

.build();

317

318

// Use appropriate config based on environment

319

CalciteConfig config = isProduction ? prodConfig : devConfig;

320

```

321

322

## Configuration Validation

323

324

The configuration system includes validation to ensure consistency:

325

326

```java

327

try {

328

CalciteConfig config = CalciteConfig.createBuilder()

329

.replaceSqlParserConfig(parserConfig)

330

.build();

331

} catch (IllegalArgumentException e) {

332

// Handle invalid configuration

333

logger.error("Invalid parser configuration: " + e.getMessage());

334

}

335

```

336

337

Configuration validation covers:

338

- **Parser Compatibility**: Ensuring parser config matches expected SQL dialect

339

- **Operator Consistency**: Validating custom operators are compatible

340

- **Program Validity**: Checking optimization programs are well-formed

341

- **Resource Limits**: Ensuring configuration values are within acceptable ranges