or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdconfiguration-management.mdfactory-registration.mdfunction-module.mdindex.mdlookup-joins.mdtable-sources-sinks.md

configuration-management.mddocs/

0

# Configuration Management

1

2

Comprehensive configuration system providing fine-grained control over connector behavior, performance tuning, and feature enablement. The configuration options cover reading, writing, streaming, lookup joins, and advanced optimization scenarios.

3

4

## Capabilities

5

6

### Core Configuration Options

7

8

Central configuration class containing all Hive connector-specific options for controlling behavior and performance.

9

10

```java { .api }

11

/**

12

* Configuration options for Hive connector behavior and performance tuning

13

* All options can be set in Flink configuration or as table properties

14

*/

15

public class HiveOptions {

16

17

// Reading Configuration Options

18

19

/**

20

* Whether to fallback to Hadoop MapReduce RecordReader for file reading

21

* Default: false (use native vectorized readers when possible)

22

*/

23

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;

24

25

/**

26

* Whether to read files in subdirectories of partitions

27

* Default: false

28

*/

29

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;

30

31

/**

32

* Whether to infer source parallelism based on number of files/splits

33

* Default: true

34

*/

35

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;

36

37

/**

38

* Maximum size of a single split for reading

39

* Default: 128MB

40

*/

41

public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_SIZE;

42

43

/**

44

* Estimated cost of opening a file (used for split calculation)

45

* Default: 4MB

46

*/

47

public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST;

48

49

// Writing Configuration Options

50

51

/**

52

* Whether to fallback to Hadoop MapReduce OutputFormat for writing

53

* Default: false (use native writers when possible)

54

*/

55

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;

56

57

/**

58

* Whether to sort records by dynamic partition columns before writing

59

* Default: false

60

*/

61

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_SORT_BY_DYNAMIC_PARTITION_ENABLE;

62

63

/**

64

* Whether to automatically gather table statistics after writing

65

* Default: false

66

*/

67

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE;

68

69

/**

70

* Partition commit policy for determining when partitions are ready

71

* Options: "metastore", "success-file", "metastore,success-file"

72

* Default: (not set, partitions committed immediately)

73

*/

74

public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;

75

76

// Streaming Configuration Options

77

78

/**

79

* Whether to enable streaming source mode for continuous partition monitoring

80

* Default: false

81

*/

82

public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;

83

84

/**

85

* Which partitions to include in streaming mode

86

* Options: "all" (all partitions), "latest" (only latest partition)

87

* Default: "all"

88

*/

89

public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE;

90

91

/**

92

* Interval for monitoring new partitions in streaming mode

93

* Default: 1 minute

94

*/

95

public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;

96

97

/**

98

* Starting point for consuming partitions in streaming mode

99

* Format: "yyyy-MM-dd HH:mm:ss" or partition name pattern

100

* Default: (consume from latest available partition)

101

*/

102

public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET;

103

104

/**

105

* Ordering strategy for processing partitions in streaming mode

106

*/

107

public static final ConfigOption<PartitionOrder> STREAMING_SOURCE_PARTITION_ORDER;

108

109

// Lookup Join Configuration Options

110

111

/**

112

* Time-to-live for cached lookup results

113

* Default: 60 minutes

114

*/

115

public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;

116

117

/**

118

* Maximum number of entries in lookup cache

119

* Default: 10000

120

*/

121

public static final ConfigOption<Integer> LOOKUP_JOIN_CACHE_MAX_SIZE;

122

123

// Partition Ordering Enum

124

125

/**

126

* Enumeration for partition ordering strategies in streaming mode

127

*/

128

public enum PartitionOrder {

129

/** Order by partition creation time in Hive metastore */

130

CREATE_TIME,

131

/** Order by partition time extracted from partition name */

132

PARTITION_TIME,

133

/** Order by partition name alphabetically */

134

PARTITION_NAME

135

}

136

}

137

```

138

139

### Catalog Configuration Options

140

141

Configuration options specific to HiveCatalog creation and behavior.

142

143

```java { .api }

144

/**

145

* Configuration options for HiveCatalog factory

146

*/

147

public class HiveCatalogFactoryOptions {

148

149

/**

150

* Path to directory containing hive-site.xml and other Hive configuration files

151

* Required for catalog creation

152

*/

153

public static final ConfigOption<String> HIVE_CONF_DIR;

154

155

/**

156

* Hive version for compatibility and feature support

157

* Default: "3.1.2"

158

*/

159

public static final ConfigOption<String> HIVE_VERSION;

160

161

/**

162

* Path to directory containing Hadoop configuration files (core-site.xml, hdfs-site.xml)

163

* Optional, but recommended for HDFS and other Hadoop filesystem access

164

*/

165

public static final ConfigOption<String> HADOOP_CONF_DIR;

166

167

/**

168

* Default database name to use when none is specified

169

* Default: "default"

170

*/

171

public static final ConfigOption<String> DEFAULT_DATABASE;

172

}

173

```

174

175

### HiveServer2 Endpoint Configuration

176

177

Configuration options for HiveServer2-compatible endpoint in Flink SQL Gateway.

178

179

```java { .api }

180

/**

181

* Configuration options for HiveServer2 endpoint

182

*/

183

public class HiveServer2EndpointConfigOptions {

184

185

// Server Configuration

186

187

/**

188

* Host address for Thrift server

189

* Default: "localhost"

190

*/

191

public static final ConfigOption<String> THRIFT_HOST;

192

193

/**

194

* Port number for Thrift server

195

* Default: 10000

196

*/

197

public static final ConfigOption<Integer> THRIFT_PORT;

198

199

/**

200

* Minimum number of worker threads for handling client connections

201

* Default: 5

202

*/

203

public static final ConfigOption<Integer> THRIFT_WORKER_THREADS_MIN;

204

205

/**

206

* Maximum number of worker threads for handling client connections

207

* Default: 500

208

*/

209

public static final ConfigOption<Integer> THRIFT_WORKER_THREADS_MAX;

210

211

/**

212

* Maximum message size for Thrift communication

213

* Default: 104857600 (100MB)

214

*/

215

public static final ConfigOption<MemorySize> THRIFT_MAX_MESSAGE_SIZE;

216

217

// Session Configuration

218

219

/**

220

* Default catalog name for new sessions

221

* Default: "default_catalog"

222

*/

223

public static final ConfigOption<String> CATALOG_DEFAULT_NAME;

224

225

/**

226

* Default database name for new sessions

227

* Default: "default"

228

*/

229

public static final ConfigOption<String> CATALOG_DEFAULT_DATABASE;

230

231

/**

232

* Module configuration for built-in function access

233

*/

234

public static final ConfigOption<String> MODULE_NAME;

235

}

236

```

237

238

## Configuration Usage Patterns

239

240

### SQL DDL Configuration

241

242

```sql

243

-- Table-level configuration

244

CREATE TABLE hive_streaming_source (

245

id BIGINT,

246

data STRING,

247

event_time TIMESTAMP(3),

248

partition_hour STRING

249

) PARTITIONED BY (partition_hour)

250

WITH (

251

'connector' = 'hive',

252

253

-- Streaming configuration

254

'streaming-source.enable' = 'true',

255

'streaming-source.partition.include' = 'latest',

256

'streaming-source.monitor-interval' = '5 min',

257

'streaming-source.consume-start-offset' = '2023-01-01 00:00:00',

258

'streaming-source.partition-order' = 'CREATE_TIME',

259

260

-- Reading optimization

261

'table.exec.hive.infer-source-parallelism' = 'true',

262

'table.exec.hive.split-max-size' = '256MB',

263

264

-- Lookup join caching

265

'lookup.join.cache.ttl' = '30 min',

266

'lookup.join.cache.max-size' = '5000'

267

);

268

269

-- Sink table configuration

270

CREATE TABLE hive_sink (

271

id BIGINT,

272

processed_data STRING,

273

partition_date STRING

274

) PARTITIONED BY (partition_date)

275

WITH (

276

'connector' = 'hive',

277

278

-- Writing configuration

279

'sink.partition-commit.policy.kind' = 'metastore,success-file',

280

'table.exec.hive.sink.sort-by-dynamic-partition.enable' = 'true',

281

'table.exec.hive.sink.statistic-auto-gather.enable' = 'true',

282

283

-- Performance tuning

284

'table.exec.hive.fallback-mapred-writer' = 'false'

285

);

286

287

-- Catalog configuration

288

CREATE CATALOG production_hive WITH (

289

'type' = 'hive',

290

'hive-conf-dir' = '/opt/hive/conf',

291

'hadoop-conf-dir' = '/opt/hadoop/conf',

292

'hive-version' = '3.1.2',

293

'default-database' = 'analytics'

294

);

295

```

296

297

### Programmatic Configuration

298

299

```java

300

// Flink configuration for global settings

301

Configuration flinkConfig = new Configuration();

302

303

// Reading optimization

304

flinkConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);

305

flinkConfig.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(256));

306

flinkConfig.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(8));

307

308

// Writing optimization

309

flinkConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);

310

flinkConfig.setBoolean(HiveOptions.TABLE_EXEC_HIVE_SINK_SORT_BY_DYNAMIC_PARTITION_ENABLE, true);

311

312

// Streaming configuration

313

flinkConfig.setBoolean(HiveOptions.STREAMING_SOURCE_ENABLE, true);

314

flinkConfig.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "latest");

315

flinkConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(5));

316

317

// Create environment with configuration

318

EnvironmentSettings settings = EnvironmentSettings.newInstance()

319

.withConfiguration(flinkConfig)

320

.build();

321

TableEnvironment tableEnv = TableEnvironment.create(settings);

322

```

323

324

### Performance Tuning Guidelines

325

326

#### Reading Performance

327

328

```java

329

// Optimize for large files with few splits

330

config.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(512));

331

config.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(16));

332

333

// Optimize for many small files

334

config.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(64));

335

config.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(1));

336

337

// Enable parallelism inference for optimal resource usage

338

config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);

339

```

340

341

#### Streaming Performance

342

343

```java

344

// High-frequency partition updates

345

config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(1));

346

config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "latest");

347

348

// Batch processing of historical data

349

config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(30));

350

config.setString(HiveOptions.STREAMING_SOURCE_PARTITION_INCLUDE, "all");

351

config.setString(HiveOptions.STREAMING_SOURCE_CONSUME_START_OFFSET, "2023-01-01 00:00:00");

352

```

353

354

#### Writing Performance

355

356

```java

357

// Enable dynamic partition sorting for better compression

358

config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_SINK_SORT_BY_DYNAMIC_PARTITION_ENABLE, true);

359

360

// Configure partition commit policies

361

config.setString(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");

362

363

// Enable automatic statistics gathering

364

config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE, true);

365

```

366

367

### Environment-Specific Configuration

368

369

#### Development Environment

370

```properties

371

# Relaxed settings for development

372

table.exec.hive.infer-source-parallelism=true

373

table.exec.hive.split-max-size=64MB

374

streaming-source.monitor-interval=30s

375

lookup.join.cache.ttl=5min

376

```

377

378

#### Production Environment

379

```properties

380

# Optimized settings for production

381

table.exec.hive.infer-source-parallelism=true

382

table.exec.hive.split-max-size=256MB

383

table.exec.hive.file-open-cost=8MB

384

streaming-source.monitor-interval=5min

385

streaming-source.partition-order=CREATE_TIME

386

sink.partition-commit.policy.kind=metastore,success-file

387

table.exec.hive.sink.statistic-auto-gather.enable=true

388

lookup.join.cache.ttl=60min

389

lookup.join.cache.max-size=50000

390

```

391

392

### Configuration Validation and Troubleshooting

393

394

Common configuration issues and their solutions:

395

396

**Split Size Issues:**

397

```java

398

// Problem: Too many small splits causing overhead

399

// Solution: Increase split size and file open cost

400

config.set(HiveOptions.TABLE_EXEC_HIVE_SPLIT_MAX_SIZE, MemorySize.ofMebiBytes(256));

401

config.set(HiveOptions.TABLE_EXEC_HIVE_FILE_OPEN_COST, MemorySize.ofMebiBytes(8));

402

```

403

404

**Streaming Lag Issues:**

405

```java

406

// Problem: Partitions not being detected quickly enough

407

// Solution: Reduce monitor interval and optimize partition ordering

408

config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofMinutes(1));

409

config.set(HiveOptions.STREAMING_SOURCE_PARTITION_ORDER, PartitionOrder.CREATE_TIME);

410

```

411

412

**Memory Issues in Lookup Joins:**

413

```java

414

// Problem: Lookup cache consuming too much memory

415

// Solution: Reduce cache size and TTL

416

config.set(HiveOptions.LOOKUP_JOIN_CACHE_MAX_SIZE, 5000);

417

config.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(30));

418

```