or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdconfiguration-management.mdfactory-registration.mdfunction-module.mdindex.mdlookup-joins.mdtable-sources-sinks.md

index.mddocs/

0

# Apache Flink SQL Connector for Hive 3.1.2

1

2

The Apache Flink SQL Connector for Hive 3.1.2 enables seamless integration between Apache Flink and Apache Hive 3.1.2, providing unified batch and stream processing capabilities with Hive tables. This connector allows Flink to read from and write to Hive tables, supports both partitioned and non-partitioned tables in streaming and batch modes, and includes comprehensive catalog integration for metadata management.

3

4

## Package Information

5

6

- **Package Name**: flink-sql-connector-hive-3.1.2_2.12

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-sql-connector-hive-3.1.2_2.12

11

- **Version**: 1.16.3

12

- **Installation**: Add Maven dependency in `pom.xml`:

13

14

```xml

15

<dependency>

16

<groupId>org.apache.flink</groupId>

17

<artifactId>flink-sql-connector-hive-3.1.2_2.12</artifactId>

18

<version>1.16.3</version>

19

</dependency>

20

```

21

22

## Core Imports

23

24

```java

25

// Factory classes for programmatic usage

26

import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;

27

import org.apache.flink.connectors.hive.HiveDynamicTableFactory;

28

import org.apache.flink.table.module.hive.HiveModuleFactory;

29

30

// Core connector classes

31

import org.apache.flink.connectors.hive.HiveTableSource;

32

import org.apache.flink.connectors.hive.HiveTableSink;

33

import org.apache.flink.connectors.hive.HiveSource;

34

import org.apache.flink.connectors.hive.HiveSourceBuilder;

35

36

// Catalog and module classes

37

import org.apache.flink.table.catalog.hive.HiveCatalog;

38

import org.apache.flink.table.module.hive.HiveModule;

39

40

// Configuration and options

41

import org.apache.flink.connectors.hive.HiveOptions;

42

import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;

43

```

44

45

## Basic Usage

46

47

### SQL DDL Usage (Most Common)

48

49

```sql

50

-- Create Hive catalog

51

CREATE CATALOG hive_catalog WITH (

52

'type' = 'hive',

53

'hive-conf-dir' = '/path/to/hive/conf',

54

'hive-version' = '3.1.2'

55

);

56

57

-- Use the Hive catalog

58

USE CATALOG hive_catalog;

59

USE default;

60

61

-- Read from existing Hive table

62

SELECT * FROM my_hive_table WHERE partition_date = '2023-01-01';

63

64

-- Create and write to new Hive table

65

CREATE TABLE new_hive_table (

66

id BIGINT,

67

name STRING,

68

event_time TIMESTAMP,

69

partition_date STRING

70

) PARTITIONED BY (partition_date)

71

STORED AS PARQUET

72

TBLPROPERTIES (

73

'sink.partition-commit.policy.kind' = 'metastore,success-file'

74

);

75

76

INSERT INTO new_hive_table SELECT id, name, event_time, '2023-01-01' FROM source_table;

77

```

78

79

### Programmatic Table API Usage

80

81

```java

82

import org.apache.flink.table.api.EnvironmentSettings;

83

import org.apache.flink.table.api.TableEnvironment;

84

import org.apache.flink.table.catalog.hive.HiveCatalog;

85

86

// Create table environment

87

EnvironmentSettings settings = EnvironmentSettings.newInstance().build();

88

TableEnvironment tableEnv = TableEnvironment.create(settings);

89

90

// Create Hive catalog

91

String catalogName = "hive_catalog";

92

String defaultDatabase = "default";

93

String hiveConfDir = "/path/to/hive/conf";

94

String hiveVersion = "3.1.2";

95

96

HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, hiveVersion);

97

tableEnv.registerCatalog(catalogName, hive);

98

tableEnv.useCatalog(catalogName);

99

100

// Execute queries

101

tableEnv.executeSql("SELECT * FROM my_hive_table LIMIT 10").print();

102

```

103

104

### DataStream API Usage

105

106

```java

107

import org.apache.flink.connectors.hive.HiveSource;

108

import org.apache.flink.connectors.hive.HiveSourceBuilder;

109

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

110

import org.apache.flink.table.data.RowData;

111

112

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

113

114

HiveSource<RowData> hiveSource = new HiveSourceBuilder()

115

.setTableIdentifier(tableIdentifier)

116

.setHiveConfiguration(hiveConf)

117

.build();

118

119

env.fromSource(hiveSource, WatermarkStrategy.noWatermarks(), "Hive Source")

120

.print();

121

122

env.execute("Hive Streaming Job");

123

```

124

125

## Architecture

126

127

The Flink Hive Connector is built around several key components:

128

129

- **Factory System**: SPI-based factory classes for automatic connector discovery and configuration

130

- **Table Connectors**: Source and sink implementations supporting both batch and streaming modes

131

- **Catalog Integration**: HiveCatalog providing unified metadata management across Flink and Hive

132

- **Module System**: HiveModule enabling access to Hive built-in functions within Flink SQL

133

- **Configuration Management**: Comprehensive options for fine-tuning performance and behavior

134

- **Shaded Dependencies**: Self-contained JAR with Hive 3.1.2 dependencies to avoid conflicts

135

136

The connector supports both legacy and modern Flink table interfaces, enabling seamless migration from older versions while providing access to the latest features.

137

138

## Capabilities

139

140

### Factory Registration

141

142

Factory classes that enable automatic connector discovery through Flink's Service Provider Interface (SPI). These factories handle connector instantiation and configuration validation.

143

144

```java { .api }

145

public class HiveCatalogFactory implements CatalogFactory {

146

public String factoryIdentifier(): "hive";

147

}

148

149

public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {

150

public String factoryIdentifier(): "hive";

151

}

152

153

public class HiveModuleFactory implements ModuleFactory {

154

public String factoryIdentifier(): "hive";

155

}

156

```

157

158

[Factory Registration](./factory-registration.md)

159

160

### Table Sources and Sinks

161

162

Core table connector implementations providing read and write capabilities for Hive tables with support for batch processing, streaming ingestion, partition management, and performance optimizations.

163

164

```java { .api }

165

public class HiveTableSource implements ScanTableSource,

166

SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {

167

// Batch and streaming source for Hive tables

168

}

169

170

public class HiveTableSink implements DynamicTableSink {

171

// Sink for writing to Hive tables with partition support

172

}

173

174

public final class HiveSource<T> implements Source<T, HiveSplitEnumeratorState, HiveSourceSplit> {

175

// Low-level source using new Source API

176

}

177

```

178

179

[Table Sources and Sinks](./table-sources-sinks.md)

180

181

### Catalog Integration

182

183

Hive catalog implementation that provides unified metadata management, enabling Flink to access Hive databases, tables, partitions, and functions through the Hive metastore.

184

185

```java { .api }

186

public class HiveCatalog extends AbstractCatalog {

187

public HiveCatalog(String catalogName, String defaultDatabase,

188

String hiveConfDir, String hiveVersion);

189

190

public static boolean isHiveTable(Map<String, String> properties);

191

}

192

```

193

194

[Catalog Integration](./catalog-integration.md)

195

196

### Function Module

197

198

Module system providing access to Hive built-in functions within Flink SQL, enabling compatibility with existing Hive UDFs and maintaining function behavior consistency.

199

200

```java { .api }

201

public class HiveModule implements Module {

202

public HiveModule(String hiveVersion);

203

}

204

```

205

206

[Function Module](./function-module.md)

207

208

### Configuration Management

209

210

Comprehensive configuration options for controlling connector behavior, performance tuning, and feature enablement across reading, writing, streaming, and lookup join scenarios.

211

212

```java { .api }

213

public class HiveOptions {

214

// Reading options

215

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;

216

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;

217

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;

218

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;

219

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM;

220

public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_SPLIT_MAX_BYTES;

221

public static final ConfigOption<MemorySize> TABLE_EXEC_HIVE_FILE_OPEN_COST;

222

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_CALCULATE_PARTITION_SIZE_THREAD_NUM;

223

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_DYNAMIC_GROUPING_ENABLED;

224

225

// Writing options

226

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;

227

public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;

228

public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_CLASS;

229

public static final ConfigOption<String> SINK_PARTITION_COMMIT_SUCCESS_FILE_NAME;

230

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_ENABLE;

231

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_SINK_STATISTIC_AUTO_GATHER_THREAD_NUM;

232

233

// Streaming options

234

public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;

235

public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE;

236

public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;

237

public static final ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET;

238

public static final ConfigOption<PartitionOrder> STREAMING_SOURCE_PARTITION_ORDER;

239

240

// Lookup join options

241

public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;

242

}

243

```

244

245

[Configuration Management](./configuration-management.md)

246

247

### Lookup Joins

248

249

Specialized lookup table source for dimension table joins, providing caching capabilities and optimized access patterns for real-time data enrichment scenarios.

250

251

```java { .api }

252

public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {

253

// Lookup join capabilities for dimension tables

254

}

255

```

256

257

[Lookup Joins](./lookup-joins.md)

258

259

## Types

260

261

### Core Interfaces and Classes

262

263

```java { .api }

264

// Exception handling

265

public class FlinkHiveException extends RuntimeException {

266

public FlinkHiveException(String message);

267

public FlinkHiveException(Throwable cause);

268

public FlinkHiveException(String message, Throwable cause);

269

}

270

271

// Partition representation

272

public class HiveTablePartition {

273

// Constructors

274

public HiveTablePartition(StorageDescriptor storageDescriptor, Properties tableProps);

275

public HiveTablePartition(StorageDescriptor storageDescriptor,

276

Map<String, String> partitionSpec, Properties tableProps);

277

278

// Instance methods

279

public StorageDescriptor getStorageDescriptor();

280

public Map<String, String> getPartitionSpec();

281

public Properties getTableProps();

282

public boolean equals(Object o);

283

public int hashCode();

284

public String toString();

285

286

// Static factory methods

287

public static HiveTablePartition ofTable(HiveConf hiveConf, String hiveVersion,

288

String dbName, String tableName);

289

public static HiveTablePartition ofPartition(HiveConf hiveConf, String hiveVersion,

290

String dbName, String tableName,

291

LinkedHashMap<String, String> partitionSpec);

292

}

293

294

// Configuration wrapper

295

public class JobConfWrapper implements Serializable {

296

public JobConfWrapper(JobConf jobConf);

297

public JobConf conf();

298

}

299

300

// Source implementation with generic type parameter

301

public final class HiveSource<T> implements Source<T, HiveSplitEnumeratorState, HiveSourceSplit> {

302

// Source interface methods

303

public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();

304

public SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>> getEnumeratorCheckpointSerializer();

305

public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> createEnumerator(

306

SplitEnumeratorContext<HiveSourceSplit> enumContext);

307

public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> restoreEnumerator(

308

SplitEnumeratorContext<HiveSourceSplit> enumContext,

309

PendingSplitsCheckpoint<HiveSourceSplit> checkpoint);

310

}

311

312

// Table source with interface implementations

313

public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,

314

SupportsProjectionPushDown, SupportsLimitPushDown, SupportsStatisticReport, SupportsDynamicFiltering {

315

316

public HiveTableSource(JobConf jobConf, ReadableConfig flinkConf,

317

ObjectPath tablePath, CatalogTable catalogTable);

318

319

// ScanTableSource methods

320

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext);

321

public ChangelogMode getChangelogMode();

322

public DynamicTableSource copy();

323

public String asSummaryString();

324

325

// Optimization interface methods

326

public void applyLimit(long limit);

327

public Optional<List<Map<String, String>>> listPartitions();

328

public void applyPartitions(List<Map<String, String>> remainingPartitions);

329

public List<String> listAcceptedFilterFields();

330

public void applyDynamicFiltering(List<String> candidateFilterFields);

331

public boolean supportsNestedProjection();

332

public void applyProjection(int[][] projectedFields, DataType producedDataType);

333

public TableStats reportStatistics();

334

}

335

336

// Table sink with interface implementations

337

public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {

338

public HiveTableSink(ReadableConfig flinkConf, JobConf jobConf,

339

ObjectIdentifier identifier, CatalogTable table, Integer configuredParallelism);

340

341

// DynamicTableSink methods

342

public SinkRuntimeProvider getSinkRuntimeProvider(Context context);

343

public ChangelogMode getChangelogMode(ChangelogMode requestedMode);

344

public DynamicTableSink copy();

345

public String asSummaryString();

346

347

// Partitioning and overwrite support

348

public boolean requiresPartitionGrouping(boolean supportsGrouping);

349

public void applyStaticPartition(Map<String, String> partition);

350

public void applyOverwrite(boolean overwrite);

351

}

352

353

// Lookup table source extending HiveTableSource

354

public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {

355

public HiveLookupTableSource(JobConf jobConf, ReadableConfig flinkConf,

356

ObjectPath tablePath, CatalogTable catalogTable);

357

358

// LookupTableSource methods

359

public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);

360

public DynamicTableSource copy();

361

}

362

363

// Builder pattern

364

public class HiveSourceBuilder {

365

// Constructors

366

public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf, String hiveVersion,

367

String dbName, String tableName, Map<String, String> tableOptions);

368

public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf, ObjectPath tablePath,

369

String hiveVersion, CatalogTable catalogTable);

370

371

// Configuration methods

372

public HiveSourceBuilder setProjectedFields(int[] projectedFields);

373

public HiveSourceBuilder setLimit(Long limit);

374

public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);

375

public HiveSourceBuilder setDynamicFilterPartitionKeys(List<String> dynamicFilterPartitionKeys);

376

377

// Build methods

378

public HiveSource<RowData> buildWithDefaultBulkFormat();

379

public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);

380

}

381

```

382

383

### Enums and Constants

384

385

```java { .api }

386

// Partition ordering strategies

387

public enum PartitionOrder implements DescribedEnum {

388

CREATE_TIME("create-time", "Order partitions by creation time"),

389

PARTITION_TIME("partition-time", "Order partitions by partition time"),

390

PARTITION_NAME("partition-name", "Order partitions by partition name");

391

392

public String toString();

393

public InlineElement getDescription();

394

}

395

396

// Version constants

397

public class HiveShimLoader {

398

public static final String HIVE_VERSION_V3_1_2 = "3.1.2";

399

public static String getHiveVersion();

400

}

401

402

// Catalog configuration constants

403

public class HiveCatalogConfig {

404

public static final String DEFAULT_LIST_COLUMN_TYPES_SEPARATOR = ":";

405

public static final String COMMENT = "comment";

406

public static final String PARTITION_LOCATION = "partition.location";

407

}

408

```