or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdconfiguration.mdfunction-module.mdindex.mdpartition-management.mdtable-source-sink.md

configuration.mddocs/

0

# Configuration and Options

1

2

Comprehensive configuration system for customizing Flink Hive connector behavior, performance tuning, and environment-specific settings. The configuration options control source/sink behavior, catalog integration, module loading, and performance optimizations.

3

4

## Capabilities

5

6

### HiveOptions

7

8

Core configuration options for Hive connector behavior and performance tuning.

9

10

```java { .api }

11

/**

12

* Configuration options for Hive connector behavior

13

* Controls reader/writer fallback, streaming mode, caching, and partition handling

14

*/

15

public class HiveOptions {

16

17

/**

18

* Whether to fallback to MapRed reader for compatibility

19

* Default: false

20

* Type: Boolean

21

*/

22

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;

23

24

/**

25

* Whether to fallback to MapRed writer for compatibility

26

* Default: false

27

* Type: Boolean

28

*/

29

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;

30

31

/**

32

* Enable streaming source mode for continuous monitoring

33

* Default: false

34

* Type: Boolean

35

*/

36

public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;

37

38

/**

39

* Monitoring interval for streaming source partition discovery

40

* Default: 1 minute

41

* Type: Duration

42

*/

43

public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;

44

45

/**

46

* Cache TTL for lookup join operations

47

* Default: no caching

48

* Type: Duration

49

*/

50

public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;

51

52

/**

53

* Whether to automatically infer source parallelism

54

* Default: true

55

* Type: Boolean

56

*/

57

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;

58

59

/**

60

* Partition commit policy for sinks

61

* Values: "metastore", "success-file", "metastore,success-file"

62

* Default: "metastore"

63

* Type: String

64

*/

65

public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;

66

67

/**

68

* Trigger for partition commit

69

* Values: "partition-time", "process-time"

70

* Default: "process-time"

71

* Type: String

72

*/

73

public static final ConfigOption<String> SINK_PARTITION_COMMIT_TRIGGER;

74

75

/**

76

* Delay before committing partitions

77

* Default: 0 (immediate)

78

* Type: Duration

79

*/

80

public static final ConfigOption<Duration> SINK_PARTITION_COMMIT_DELAY;

81

82

/**

83

* Watermark timezone for partition-time commit trigger

84

* Default: system timezone

85

* Type: String

86

*/

87

public static final ConfigOption<String> SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE;

88

89

/**

90

* Custom success file names for partition commit

91

* Default: "_SUCCESS"

92

* Type: String

93

*/

94

public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;

95

96

/**

97

* Enable reading partitions with subdirectories

98

* Default: true

99

* Type: Boolean

100

*/

101

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;

102

103

/**

104

* Maximum inferred source parallelism

105

* Default: 1000

106

* Type: Integer

107

*/

108

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;

109

110

/**

111

* Thread count for loading partition splits

112

* Default: 3

113

* Type: Integer

114

*/

115

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM;

116

117

/**

118

* Maximum split size for Hive table reading

119

* Default: 128MB

120

* Type: MemorySize

121

*/

122

public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_BYTES;

123

124

/**

125

* Estimated cost to open a file for split calculation

126

* Default: 4MB

127

* Type: MemorySize

128

*/

129

public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST;

130

131

/**

132

* Thread count for calculating partition sizes

133

* Default: 3

134

* Type: Integer

135

*/

136

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM;

137

138

/**

139

* Enable dynamic partition grouping

140

* Default: false

141

* Type: Boolean

142

*/

143

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_DYNAMIC_GROUPING_ENABLED;

144

145

/**

146

* Thread count for reading table/partition statistics

147

* Default: 3

148

* Type: Integer

149

*/

150

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM;

151

152

/**

153

* Average size threshold for small file compaction

154

* Default: 16MB

155

* Type: MemorySize

156

*/

157

public static final ConfigOption<MemorySize> COMPACT_SMALL_FILES_AVG_SIZE;

158

159

/**

160

* Enable automatic statistics gathering for sink

161

* Default: true

162

* Type: Boolean

163

*/

164

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE;

165

}

166

```

167

168

**Configuration Usage Examples:**

169

170

```java

171

import org.apache.flink.configuration.Configuration;

172

import org.apache.flink.connectors.hive.HiveOptions;

173

174

// Configure Hive connector options

175

Configuration config = new Configuration();

176

177

// Enable streaming mode with 30-second monitoring

178

config.set(HiveOptions.STREAMING_SOURCE_ENABLE, true);

179

config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(30));

180

181

// Configure lookup join caching

182

config.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(10));

183

184

// Enable automatic parallelism inference

185

config.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);

186

187

// Configure partition commit policy

188

config.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");

189

config.set(HiveOptions.SINK_PARTITION_COMMIT_TRIGGER, "partition-time");

190

config.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofHours(1));

191

192

// Apply configuration to table environment

193

TableEnvironment tableEnv = TableEnvironment.create(

194

EnvironmentSettings.newInstance()

195

.withConfiguration(config)

196

.build()

197

);

198

```

199

200

### HiveCatalogFactoryOptions

201

202

Configuration options for HiveCatalog creation and metastore connection.

203

204

```java { .api }

205

/**

206

* Configuration options for HiveCatalogFactory

207

* Controls catalog metadata, Hive configuration, and connection settings

208

*/

209

public class HiveCatalogFactoryOptions {

210

211

/**

212

* Default database name for the catalog

213

* Default: "default"

214

* Type: String

215

*/

216

public static final ConfigOption<String> DEFAULT_DATABASE;

217

218

/**

219

* Directory containing Hive configuration files (hive-site.xml)

220

* Default: null (use classpath)

221

* Type: String

222

*/

223

public static final ConfigOption<String> HIVE_CONF_DIR;

224

225

/**

226

* Hive version for compatibility

227

* Supported: "2.3.4", "2.3.6", "2.3.9", "3.1.2", "3.1.3"

228

* Default: auto-detected

229

* Type: String

230

*/

231

public static final ConfigOption<String> HIVE_VERSION;

232

233

/**

234

* Directory containing Hadoop configuration files (core-site.xml, hdfs-site.xml)

235

* Default: null (use classpath)

236

* Type: String

237

*/

238

public static final ConfigOption<String> HADOOP_CONF_DIR;

239

}

240

```

241

242

**Catalog Configuration Examples:**

243

244

```java

245

// Programmatic catalog configuration

246

Map<String, String> catalogProperties = new HashMap<>();

247

catalogProperties.put("type", "hive");

248

catalogProperties.put("default-database", "analytics");

249

catalogProperties.put("hive-conf-dir", "/etc/hive/conf");

250

catalogProperties.put("hadoop-conf-dir", "/etc/hadoop/conf");

251

catalogProperties.put("hive-version", "2.3.9");

252

253

// Create catalog with properties

254

HiveCatalog catalog = new HiveCatalog(

255

"production_hive",

256

catalogProperties.get("default-database"),

257

catalogProperties.get("hive-conf-dir"),

258

catalogProperties.get("hadoop-conf-dir"),

259

catalogProperties.get("hive-version")

260

);

261

262

// SQL DDL catalog creation

263

tableEnv.executeSql("""

264

CREATE CATALOG production_hive WITH (

265

'type' = 'hive',

266

'default-database' = 'analytics',

267

'hive-conf-dir' = '/etc/hive/conf',

268

'hadoop-conf-dir' = '/etc/hadoop/conf',

269

'hive-version' = '2.3.9'

270

)

271

""");

272

```

273

274

### HiveModuleOptions

275

276

Configuration options for HiveModule function loading and compatibility.

277

278

```java { .api }

279

/**

280

* Configuration options for HiveModule

281

* Controls function loading and Hive version compatibility

282

*/

283

public class HiveModuleOptions {

284

285

/**

286

* Hive version for function compatibility

287

* Default: auto-detected from classpath

288

* Type: String

289

*/

290

public static final ConfigOption<String> HIVE_VERSION;

291

}

292

```

293

294

**Module Configuration Examples:**

295

296

```java

297

// Module configuration via factory

298

Map<String, String> moduleProperties = new HashMap<>();

299

moduleProperties.put("type", "hive");

300

moduleProperties.put("hive-version", "2.3.9");

301

302

// Load module with specific version

303

HiveModule module = new HiveModule("2.3.9");

304

tableEnv.loadModule("hive", module);

305

306

// SQL DDL module loading

307

tableEnv.executeSql("""

308

LOAD MODULE hive WITH (

309

'type' = 'hive',

310

'hive-version' = '2.3.9'

311

)

312

""");

313

```

314

315

## Performance Configuration

316

317

### Source Performance Options

318

319

```java

320

// Optimize source performance

321

Configuration sourceConfig = new Configuration();

322

323

// Enable parallelism inference for optimal performance

324

sourceConfig.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);

325

326

// Configure reader fallback (disable for better performance)

327

sourceConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);

328

329

// Set optimal batch size for bulk reading

330

sourceConfig.setString("table.exec.resource.default-parallelism", "8");

331

332

// Configure memory for large datasets

333

sourceConfig.setString("taskmanager.memory.process.size", "4g");

334

sourceConfig.setString("taskmanager.memory.flink.size", "3g");

335

```

336

337

### Sink Performance Options

338

339

```java

340

// Optimize sink performance and reliability

341

Configuration sinkConfig = new Configuration();

342

343

// Configure partition commit for reliability

344

sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");

345

sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_TRIGGER, "partition-time");

346

sinkConfig.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofMinutes(5));

347

348

// Disable writer fallback for better performance

349

sinkConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);

350

351

// Configure checkpointing for exactly-once semantics

352

sinkConfig.setString("execution.checkpointing.interval", "30s");

353

sinkConfig.setString("execution.checkpointing.mode", "EXACTLY_ONCE");

354

```

355

356

### Streaming Configuration

357

358

```java

359

// Configure streaming mode for real-time processing

360

Configuration streamingConfig = new Configuration();

361

362

// Enable streaming source

363

streamingConfig.set(HiveOptions.STREAMING_SOURCE_ENABLE, true);

364

streamingConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(10));

365

366

// Configure lookup join caching

367

streamingConfig.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(15));

368

369

// Set watermark configuration

370

streamingConfig.setString("table.exec.source.idle-timeout", "30s");

371

streamingConfig.setString("pipeline.time-characteristic", "EventTime");

372

```

373

374

## Environment-Specific Configuration

375

376

### Production Environment

377

378

```java

379

// Production-ready configuration

380

Configuration prodConfig = new Configuration();

381

382

// Reliability settings

383

prodConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore,success-file");

384

prodConfig.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofMinutes(10));

385

386

// Performance settings

387

prodConfig.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);

388

prodConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);

389

prodConfig.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);

390

391

// Memory and resource settings

392

prodConfig.setString("taskmanager.numberOfTaskSlots", "4");

393

prodConfig.setString("taskmanager.memory.process.size", "8g");

394

prodConfig.setString("jobmanager.memory.process.size", "2g");

395

396

// Checkpointing for fault tolerance

397

prodConfig.setString("execution.checkpointing.interval", "60s");

398

prodConfig.setString("state.backend", "rocksdb");

399

prodConfig.setString("state.checkpoints.dir", "hdfs://namenode:9000/flink/checkpoints");

400

prodConfig.setString("state.savepoints.dir", "hdfs://namenode:9000/flink/savepoints");

401

```

402

403

### Development Environment

404

405

```java

406

// Development-friendly configuration

407

Configuration devConfig = new Configuration();

408

409

// Fast feedback settings

410

devConfig.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, Duration.ofSeconds(5));

411

devConfig.set(HiveOptions.SINK_PARTITION_COMMIT_DELAY, Duration.ofSeconds(10));

412

413

// Simplified commit policy

414

devConfig.set(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND, "metastore");

415

416

// Reduced resource usage

417

devConfig.setString("taskmanager.numberOfTaskSlots", "2");

418

devConfig.setString("taskmanager.memory.process.size", "2g");

419

420

// Local state backend

421

devConfig.setString("state.backend", "filesystem");

422

devConfig.setString("state.checkpoints.dir", "file:///tmp/flink-checkpoints");

423

```

424

425

## SQL Configuration

426

427

### Table Properties Configuration

428

429

```sql

430

-- Configure Hive table with connector properties

431

CREATE TABLE streaming_events (

432

user_id STRING,

433

event_type STRING,

434

event_time TIMESTAMP(3),

435

WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

436

) PARTITIONED BY (event_date STRING)

437

WITH (

438

'connector' = 'hive',

439

'streaming-source.enable' = 'true',

440

'streaming-source.monitor-interval' = '10s',

441

'sink.partition-commit.trigger' = 'partition-time',

442

'sink.partition-commit.delay' = '1 h',

443

'sink.partition-commit.policy.kind' = 'metastore,success-file'

444

);

445

446

-- Configure external table with custom properties

447

CREATE TABLE external_logs (

448

log_level STRING,

449

message STRING,

450

log_time TIMESTAMP

451

) PARTITIONED BY (date_partition STRING)

452

WITH (

453

'connector' = 'hive',

454

'streaming-source.enable' = 'false',

455

'lookup.join-cache.ttl' = '1 h'

456

);

457

```

458

459

### Session Configuration

460

461

```sql

462

-- Set session-level configuration

463

SET 'table.exec.hive.infer-source-parallelism' = 'true';

464

SET 'table.exec.hive.fallback-mapred-reader' = 'false';

465

SET 'execution.checkpointing.interval' = '30s';

466

467

-- Configure catalog defaults

468

SET 'table.sql-dialect' = 'hive';

469

SET 'table.exec.hive.fallback-mapred-writer' = 'false';

470

```

471

472

## Advanced Configuration Patterns

473

474

### Multi-Cluster Configuration

475

476

```java

477

// Configure for multi-cluster Hive setup

478

Configuration multiClusterConfig = new Configuration();

479

480

// Primary cluster configuration

481

Map<String, String> primaryCatalogProps = Map.of(

482

"type", "hive",

483

"default-database", "production",

484

"hive-conf-dir", "/etc/hive/primary/conf",

485

"hadoop-conf-dir", "/etc/hadoop/primary/conf",

486

"hive-version", "2.3.9"

487

);

488

489

// Secondary cluster configuration

490

Map<String, String> secondaryCatalogProps = Map.of(

491

"type", "hive",

492

"default-database", "analytics",

493

"hive-conf-dir", "/etc/hive/secondary/conf",

494

"hadoop-conf-dir", "/etc/hadoop/secondary/conf",

495

"hive-version", "2.3.9"

496

);

497

498

// Register multiple catalogs

499

tableEnv.executeSql("CREATE CATALOG primary_hive WITH " + formatProperties(primaryCatalogProps));

500

tableEnv.executeSql("CREATE CATALOG secondary_hive WITH " + formatProperties(secondaryCatalogProps));

501

502

// Cross-cluster queries

503

Table result = tableEnv.sqlQuery("""

504

SELECT p.*, s.analytics_data

505

FROM primary_hive.production.users p

506

JOIN secondary_hive.analytics.user_metrics s

507

ON p.user_id = s.user_id

508

""");

509

```

510

511

### Version-Specific Configuration

512

513

```java

514

// Handle different Hive versions

515

public class HiveConfigurationManager {

516

517

public Configuration getHive239Configuration() {

518

Configuration config = new Configuration();

519

config.set(HiveCatalogFactoryOptions.HIVE_VERSION, "2.3.9");

520

config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);

521

config.set(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);

522

return config;

523

}

524

525

public Configuration getHive313Configuration() {

526

Configuration config = new Configuration();

527

config.set(HiveCatalogFactoryOptions.HIVE_VERSION, "3.1.3");

528

// Version 3.x specific optimizations

529

config.set(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);

530

return config;

531

}

532

}

533

```

534

535

### Resource-Aware Configuration

536

537

```java

538

// Configure based on available resources

539

public Configuration createResourceAwareConfig(int availableCores, long availableMemoryMB) {

540

Configuration config = new Configuration();

541

542

// Scale parallelism based on cores

543

int parallelism = Math.max(1, availableCores / 2);

544

config.setString("parallelism.default", String.valueOf(parallelism));

545

546

// Configure memory based on available resources

547

long taskManagerMemory = Math.min(availableMemoryMB / 2, 8192); // Max 8GB per TM

548

config.setString("taskmanager.memory.process.size", taskManagerMemory + "m");

549

550

// Adjust monitoring interval based on load

551

Duration monitorInterval = availableCores > 8 ?

552

Duration.ofSeconds(5) : Duration.ofSeconds(30);

553

config.set(HiveOptions.STREAMING_SOURCE_MONITOR_INTERVAL, monitorInterval);

554

555

return config;

556

}

557

```