or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-operations.mdconfiguration.mdhive-functions.mdindex.mdsource-api.mdtable-sinks.mdtable-sources.md

configuration.mddocs/

0

# Configuration

1

2

Configuration options and factory classes for setting up Hive integration with customizable behavior for performance, compatibility, and operational requirements.

3

4

## Capabilities

5

6

### HiveOptions

7

8

Configuration options for tuning Hive connector behavior and performance.

9

10

```java { .api }

11

/**

12

* Configuration options for Hive connector operations

13

*/

14

public class HiveOptions {

15

/**

16

* Whether to use Hadoop MapRed record reader for ORC files

17

* Default: false

18

*/

19

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER =

20

ConfigOptions.key("table.exec.hive.fallback-mapred-reader")

21

.defaultValue(false)

22

.withDescription(

23

"If it is false, using flink native vectorized reader to read orc files; " +

24

"If it is true, using hadoop mapred record reader to read orc files.");

25

26

/**

27

* Whether to infer source parallelism based on splits

28

* Default: true

29

*/

30

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =

31

ConfigOptions.key("table.exec.hive.infer-source-parallelism")

32

.defaultValue(true)

33

.withDescription(

34

"If is false, parallelism of source are set by config.\n" +

35

"If is true, source parallelism is inferred according to splits number.\n");

36

37

/**

38

* Maximum inferred parallelism for source operator

39

* Default: 1000

40

*/

41

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =

42

ConfigOptions.key("table.exec.hive.infer-source-parallelism.max")

43

.defaultValue(1000)

44

.withDescription("Sets max infer parallelism for source operator.");

45

46

/**

47

* Whether to use Hadoop MapRed record writer for Parquet and ORC files

48

* Default: true

49

*/

50

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER =

51

ConfigOptions.key("table.exec.hive.fallback-mapred-writer")

52

.booleanType()

53

.defaultValue(true)

54

.withDescription(

55

"If it is false, using flink native writer to write parquet and orc files; " +

56

"If it is true, using hadoop mapred record writer to write parquet and orc files.");

57

}

58

```

59

60

### Catalog Factory Configuration

61

62

Factory and configuration options for creating Hive catalog instances.

63

64

```java { .api }

65

/**

66

* Factory for creating HiveCatalog instances with configuration validation

67

*/

68

public class HiveCatalogFactory implements CatalogFactory {

69

/**

70

* Get the factory identifier for service discovery

71

* @return "hive" identifier string

72

*/

73

public String factoryIdentifier();

74

75

/**

76

* Create HiveCatalog from configuration context

77

* @param context - Factory context with configuration options

78

* @return Configured HiveCatalog instance

79

*/

80

public Catalog createCatalog(Context context);

81

82

/**

83

* Get required configuration options

84

* @return Set of required ConfigOption objects (empty for Hive)

85

*/

86

public Set<ConfigOption<?>> requiredOptions();

87

88

/**

89

* Get optional configuration options

90

* @return Set of optional ConfigOption objects

91

*/

92

public Set<ConfigOption<?>> optionalOptions();

93

}

94

95

/**

96

* Configuration options for HiveCatalogFactory

97

*/

98

public class HiveCatalogFactoryOptions {

99

/**

100

* Factory identifier for service discovery

101

*/

102

public static final String IDENTIFIER = "hive";

103

104

/**

105

* Default database name for the catalog

106

* Default: "default"

107

*/

108

public static final ConfigOption<String> DEFAULT_DATABASE =

109

ConfigOptions.key("default-database")

110

.stringType()

111

.defaultValue("default")

112

.withDescription("Default database name for the catalog.");

113

114

/**

115

* Directory containing hive-site.xml configuration file

116

* Default: null (uses classpath)

117

*/

118

public static final ConfigOption<String> HIVE_CONF_DIR =

119

ConfigOptions.key("hive-conf-dir")

120

.stringType()

121

.noDefaultValue()

122

.withDescription("Directory containing hive-site.xml configuration file.");

123

124

/**

125

* Directory containing Hadoop configuration files

126

* Default: null (uses classpath)

127

*/

128

public static final ConfigOption<String> HADOOP_CONF_DIR =

129

ConfigOptions.key("hadoop-conf-dir")

130

.stringType()

131

.noDefaultValue()

132

.withDescription("Directory containing Hadoop configuration files.");

133

134

/**

135

* Hive version string for compatibility

136

* Default: null (auto-detected)

137

*/

138

public static final ConfigOption<String> HIVE_VERSION =

139

ConfigOptions.key("hive-version")

140

.stringType()

141

.noDefaultValue()

142

.withDescription("Hive version string for compatibility.");

143

}

144

```

145

146

### Module Factory Configuration

147

148

Factory and configuration for HiveModule creation.

149

150

```java { .api }

151

/**

152

* Factory for creating HiveModule instances

153

*/

154

public class HiveModuleFactory implements ModuleFactory {

155

/**

156

* Get the factory identifier

157

* @return "hive" identifier string

158

*/

159

public String factoryIdentifier();

160

161

/**

162

* Create HiveModule from configuration context

163

* @param context - Factory context with configuration options

164

* @return Configured HiveModule instance

165

*/

166

public Module createModule(Context context);

167

168

/**

169

* Get required configuration options

170

* @return Set of required ConfigOption objects (empty for Hive module)

171

*/

172

public Set<ConfigOption<?>> requiredOptions();

173

174

/**

175

* Get optional configuration options

176

* @return Set of optional ConfigOption objects

177

*/

178

public Set<ConfigOption<?>> optionalOptions();

179

}

180

181

/**

182

* Configuration options for HiveModule

183

*/

184

public class HiveModuleOptions {

185

/**

186

* Hive version for function compatibility

187

* Default: null (uses latest supported)

188

*/

189

public static final ConfigOption<String> HIVE_VERSION =

190

ConfigOptions.key("hive-version")

191

.stringType()

192

.noDefaultValue()

193

.withDescription("Hive version for function compatibility.");

194

}

195

```

196

197

### Streaming Source Configuration

198

199

Configuration options specific to streaming Hive sources.

200

201

```java { .api }

202

/**

203

* Configuration options from FileSystemConnectorOptions used by Hive connector

204

*/

205

public class FileSystemConnectorOptions {

206

/**

207

* Enable streaming source mode for partition monitoring

208

* Default: false

209

*/

210

public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =

211

ConfigOptions.key("streaming-source.enable")

212

.booleanType()

213

.defaultValue(false)

214

.withDescription("Enable streaming source mode for partition monitoring.");

215

216

/**

217

* Which partitions to include in streaming mode

218

* Options: "all", "latest"

219

* Default: "all"

220

*/

221

public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE =

222

ConfigOptions.key("streaming-source.partition.include")

223

.stringType()

224

.defaultValue("all")

225

.withDescription("Which partitions to include: 'all' or 'latest'.");

226

227

/**

228

* Interval for monitoring new partitions (in milliseconds)

229

* Default: 60000 (1 minute)

230

*/

231

public static final ConfigOption<Long> STREAMING_SOURCE_MONITOR_INTERVAL =

232

ConfigOptions.key("streaming-source.monitor-interval")

233

.longType()

234

.defaultValue(60000L)

235

.withDescription("Interval for monitoring new partitions in milliseconds.");

236

237

/**

238

* Configured parallelism for sink operations

239

* Default: null (uses default parallelism)

240

*/

241

public static final ConfigOption<Integer> SINK_PARALLELISM =

242

ConfigOptions.key("sink.parallelism")

243

.intType()

244

.noDefaultValue()

245

.withDescription("Configured parallelism for sink operations.");

246

}

247

```

248

249

### Hadoop and Hive Configuration Utilities

250

251

Utility classes for managing Hadoop and Hive configuration.

252

253

```java { .api }

254

/**

255

* Utilities for managing Hive configuration

256

*/

257

public class HiveConfUtils {

258

/**

259

* Create HiveConf from configuration directory

260

* @param hiveConfDir - Directory containing hive-site.xml (can be null)

261

* @return Configured HiveConf instance

262

*/

263

public static HiveConf create(String hiveConfDir);

264

265

/**

266

* Get Hive configuration with custom properties

267

* @param hiveConf - Base Hive configuration

268

* @param customProps - Additional properties to set

269

* @return Updated HiveConf instance

270

*/

271

public static HiveConf create(HiveConf hiveConf, Map<String, String> customProps);

272

}

273

274

/**

275

* Utilities for managing Hadoop JobConf

276

*/

277

public class JobConfUtils {

278

/**

279

* Create JobConf with security credentials

280

* @param hiveConf - Hive configuration to base JobConf on

281

* @return JobConf with security credentials configured

282

*/

283

public static JobConf createJobConfWithCredentials(HiveConf hiveConf);

284

285

/**

286

* Create JobConf with custom properties

287

* @param hiveConf - Base Hive configuration

288

* @param extraConf - Additional configuration properties

289

* @return Configured JobConf instance

290

*/

291

public static JobConf createJobConf(HiveConf hiveConf, Map<String, String> extraConf);

292

}

293

294

/**

295

* Factory for creating Hadoop FileSystem instances

296

*/

297

public class HadoopFileSystemFactory {

298

/**

299

* Create file system factory with configuration

300

* @param hadoopConf - Hadoop configuration

301

*/

302

public HadoopFileSystemFactory(org.apache.hadoop.conf.Configuration hadoopConf);

303

304

/**

305

* Create file system for given URI

306

* @param fsUri - File system URI

307

* @return FileSystem instance

308

* @throws IOException if creation fails

309

*/

310

public FileSystem create(URI fsUri) throws IOException;

311

}

312

```

313

314

### Dynamic Table Factory Configuration

315

316

Configuration for the dynamic table factory system.

317

318

```java { .api }

319

/**

320

* Configuration options for dynamic table operations

321

*/

322

public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

323

/**

324

* Get factory identifier (throws UnsupportedOperationException)

325

* Hive factory only works through catalog, not standalone

326

* @return Not supported

327

* @throws UnsupportedOperationException always

328

*/

329

public String factoryIdentifier();

330

331

/**

332

* Get required options (throws UnsupportedOperationException)

333

* @return Not supported

334

* @throws UnsupportedOperationException always

335

*/

336

public Set<ConfigOption<?>> requiredOptions();

337

338

/**

339

* Get optional options (throws UnsupportedOperationException)

340

* @return Not supported

341

* @throws UnsupportedOperationException always

342

*/

343

public Set<ConfigOption<?>> optionalOptions();

344

345

/**

346

* Create dynamic table source based on context

347

* @param context - Creation context with catalog table info

348

* @return DynamicTableSource implementation

349

*/

350

public DynamicTableSource createDynamicTableSource(Context context);

351

352

/**

353

* Create dynamic table sink based on context

354

* @param context - Creation context with catalog table info

355

* @return DynamicTableSink implementation

356

*/

357

public DynamicTableSink createDynamicTableSink(Context context);

358

}

359

```

360

361

**Usage Examples:**

362

363

```java

364

import org.apache.flink.table.api.TableEnvironment;

365

import org.apache.flink.configuration.Configuration;

366

import org.apache.flink.connectors.hive.HiveOptions;

367

368

// Configure Hive connector options

369

Configuration config = new Configuration();

370

371

// Use native Flink readers for better performance

372

config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);

373

374

// Use native Flink writers for better performance

375

config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);

376

377

// Enable source parallelism inference

378

config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);

379

config.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 500);

380

381

// Create table environment with configuration

382

TableEnvironment tableEnv = TableEnvironment.create(

383

EnvironmentSettings.newInstance()

384

.inBatchMode()

385

.withConfiguration(config)

386

.build()

387

);

388

```

389

390

```java

391

// Create Hive catalog with full configuration options

392

Map<String, String> catalogOptions = new HashMap<>();

393

catalogOptions.put("type", "hive");

394

catalogOptions.put("default-database", "analytics");

395

catalogOptions.put("hive-conf-dir", "/opt/hive/conf");

396

catalogOptions.put("hadoop-conf-dir", "/opt/hadoop/etc/hadoop");

397

catalogOptions.put("hive-version", "2.3.6");

398

399

// Use catalog factory to create catalog

400

CatalogFactory.Context context = new CatalogFactory.Context() {

401

public String getName() { return "hive_catalog"; }

402

public Map<String, String> getOptions() { return catalogOptions; }

403

public ReadableConfig getConfiguration() { return Configuration.fromMap(catalogOptions); }

404

public ClassLoader getClassLoader() { return Thread.currentThread().getContextClassLoader(); }

405

};

406

407

HiveCatalogFactory factory = new HiveCatalogFactory();

408

Catalog hiveCatalog = factory.createCatalog(context);

409

410

tableEnv.registerCatalog("hive_catalog", hiveCatalog);

411

tableEnv.useCatalog("hive_catalog");

412

```

413

414

```java

415

// Configure streaming Hive source

416

tableEnv.executeSql(

417

"CREATE TABLE streaming_events (" +

418

" event_id BIGINT," +

419

" user_id BIGINT," +

420

" event_time TIMESTAMP(3)," +

421

" event_type STRING," +

422

" partition_date STRING" +

423

") PARTITIONED BY (partition_date) " +

424

"STORED AS PARQUET " +

425

"TBLPROPERTIES (" +

426

" 'streaming-source.enable' = 'true'," +

427

" 'streaming-source.partition.include' = 'all'," +

428

" 'streaming-source.monitor-interval' = '30000'" + // 30 seconds

429

")"

430

);

431

432

// Query streaming table

433

Table result = tableEnv.sqlQuery(

434

"SELECT event_type, COUNT(*) as event_count " +

435

"FROM streaming_events " +

436

"WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +

437

"GROUP BY event_type"

438

);

439

```

440

441

```java

442

// Load Hive module with specific version

443

Map<String, String> moduleOptions = new HashMap<>();

444

moduleOptions.put("hive-version", "2.3.6");

445

446

ModuleFactory.Context moduleContext = new ModuleFactory.Context() {

447

public Map<String, String> getOptions() { return moduleOptions; }

448

public ReadableConfig getConfiguration() { return Configuration.fromMap(moduleOptions); }

449

public ClassLoader getClassLoader() { return Thread.currentThread().getContextClassLoader(); }

450

};

451

452

HiveModuleFactory moduleFactory = new HiveModuleFactory();

453

Module hiveModule = moduleFactory.createModule(moduleContext);

454

455

tableEnv.loadModule("hive", hiveModule);

456

```

457

458

## Types

459

460

```java { .api }

461

public interface ConfigOption<T> {

462

/**

463

* Get the option key

464

* @return Configuration key string

465

*/

466

String key();

467

468

/**

469

* Get the default value

470

* @return Default value for this option

471

*/

472

T defaultValue();

473

474

/**

475

* Get the option description

476

* @return Human-readable description

477

*/

478

String description();

479

}

480

481

public interface CatalogFactory extends Factory {

482

/**

483

* Create catalog from context

484

* @param context - Creation context

485

* @return Catalog instance

486

*/

487

Catalog createCatalog(Context context);

488

489

/**

490

* Context interface for catalog creation

491

*/

492

interface Context {

493

String getName();

494

Map<String, String> getOptions();

495

ReadableConfig getConfiguration();

496

ClassLoader getClassLoader();

497

}

498

}

499

500

public interface ModuleFactory extends Factory {

501

/**

502

* Create module from context

503

* @param context - Creation context

504

* @return Module instance

505

*/

506

Module createModule(Context context);

507

508

/**

509

* Context interface for module creation

510

*/

511

interface Context {

512

Map<String, String> getOptions();

513

ReadableConfig getConfiguration();

514

ClassLoader getClassLoader();

515

}

516

}

517

518

public interface Factory {

519

/**

520

* Get unique factory identifier

521

* @return Factory identifier string

522

*/

523

String factoryIdentifier();

524

525

/**

526

* Get required configuration options

527

* @return Set of required options

528

*/

529

Set<ConfigOption<?>> requiredOptions();

530

531

/**

532

* Get optional configuration options

533

* @return Set of optional options

534

*/

535

Set<ConfigOption<?>> optionalOptions();

536

}

537

```