or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

catalog-integration.mdconfiguration.mdfunction-module.mdindex.mdpartition-management.mdtable-source-sink.md

index.mddocs/

0

# Apache Flink SQL Connector for Hive 2.3.9

1

2

Apache Flink SQL connector for Apache Hive 2.3.9 enables seamless integration between Flink's streaming and batch processing capabilities and Apache Hive data warehouses. This connector provides comprehensive access to Hive tables, metastore operations, and built-in functions, allowing developers to leverage existing Hive infrastructure within Flink applications.

3

4

## Package Information

5

6

- **Package Name**: flink-sql-connector-hive-2.3.9_2.12

7

- **Package Type**: maven

8

- **Group ID**: org.apache.flink

9

- **Language**: Java

10

- **Installation**: Add dependency to pom.xml

11

- **Hive Version**: 2.3.9

12

- **Scala Version**: 2.12

13

14

```xml

15

<dependency>

16

<groupId>org.apache.flink</groupId>

17

<artifactId>flink-sql-connector-hive-2.3.9_2.12</artifactId>

18

<version>1.19.3</version>

19

</dependency>

20

```

21

22

## Core Imports

23

24

```java

25

// Main factories

26

import org.apache.flink.connectors.hive.HiveDynamicTableFactory;

27

import org.apache.flink.table.catalog.hive.HiveCatalog;

28

import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;

29

import org.apache.flink.table.module.hive.HiveModule;

30

31

// Source and sink classes

32

import org.apache.flink.connectors.hive.HiveSource;

33

import org.apache.flink.connectors.hive.HiveSourceBuilder;

34

import org.apache.flink.connectors.hive.HiveTableSource;

35

import org.apache.flink.connectors.hive.HiveTableSink;

36

37

// Configuration

38

import org.apache.flink.connectors.hive.HiveOptions;

39

import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;

40

41

// Data structures

42

import org.apache.flink.connectors.hive.HiveTablePartition;

43

import org.apache.flink.connectors.hive.FlinkHiveException;

44

```

45

46

## Basic Usage

47

48

### Table API Integration

49

50

```java

51

import org.apache.flink.table.api.EnvironmentSettings;

52

import org.apache.flink.table.api.TableEnvironment;

53

import org.apache.flink.table.catalog.hive.HiveCatalog;

54

55

// Create table environment

56

EnvironmentSettings settings = EnvironmentSettings

57

.newInstance()

58

.inBatchMode()

59

.build();

60

TableEnvironment tableEnv = TableEnvironment.create(settings);

61

62

// Create and register Hive catalog

63

String catalogName = "myhive";

64

String defaultDatabase = "mydatabase";

65

String hiveConfDir = "/opt/hive-conf";

66

String version = "2.3.9";

67

68

HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, null, version);

69

tableEnv.registerCatalog("myhive", hive);

70

tableEnv.useCatalog("myhive");

71

72

// Query Hive tables

73

Table result = tableEnv.sqlQuery("SELECT * FROM mytable WHERE active = true");

74

result.execute().print();

75

```

76

77

### DataStream API Integration

78

79

```java

80

import org.apache.flink.api.common.eventtime.WatermarkStrategy;

81

import org.apache.flink.connectors.hive.HiveSource;

82

import org.apache.flink.connectors.hive.HiveSourceBuilder;

83

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

84

85

// Create execution environment

86

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

87

88

// Build Hive source

89

JobConf jobConf = new JobConf();

90

jobConf.set("hive.metastore.uris", "thrift://localhost:9083");

91

92

HiveSource<RowData> source = new HiveSourceBuilder(

93

jobConf,

94

new Configuration(),

95

"2.3.9", // hiveVersion

96

"mydb",

97

"mytable",

98

Collections.emptyMap()

99

).buildWithDefaultBulkFormat();

100

101

// Create data stream

102

DataStream<RowData> stream = env

103

.fromSource(source, WatermarkStrategy.noWatermarks(), "hive-source");

104

105

stream.print();

106

env.execute("Hive Stream Processing");

107

```

108

109

## Architecture

110

111

The Flink Hive connector is built around several key architectural components:

112

113

- **Factory System**: Plugin-based factories (`HiveDynamicTableFactory`, `HiveCatalogFactory`) for dynamic registration and configuration

114

- **Source/Sink Framework**: Unified data access layer with streaming and batch support through `HiveSource` and `HiveTableSink`

115

- **Catalog Integration**: Full Hive metastore integration via `HiveCatalog` for schema discovery and table management

116

- **Function Module**: Native Hive function support through `HiveModule` for UDF compatibility

117

- **Partition Management**: Intelligent partition handling with pruning and dynamic discovery capabilities

118

- **Type System**: Seamless type mapping between Hive and Flink data types with full serialization support

119

120

## Capabilities

121

122

### Table Source and Sink Operations

123

124

Core functionality for reading from and writing to Hive tables using Flink's Table API and DataStream API. Supports both batch and streaming modes with comprehensive partition handling.

125

126

```java { .api }

127

// Main source builder

128

public class HiveSourceBuilder {

129

public HiveSourceBuilder(JobConf jobConf, ReadableConfig flinkConf,

130

String hiveVersion, String dbName,

131

String tableName, Map<String, String> tableOptions);

132

public HiveSource<RowData> buildWithDefaultBulkFormat();

133

public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);

134

public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);

135

public HiveSourceBuilder setLimit(Long limit);

136

public HiveSourceBuilder setProjectedFields(int[] projectedFields);

137

}

138

139

// Dynamic table source

140

public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,

141

SupportsProjectionPushDown, SupportsLimitPushDown {

142

public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);

143

public Result applyPartitions(List<Map<String, String>> remainingPartitions);

144

public Result applyProjection(int[][] projectedFields, DataType producedDataType);

145

public Result applyLimit(long limit);

146

}

147

148

// Dynamic table sink

149

public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {

150

public SinkRuntimeProvider getSinkRuntimeProvider(Context context);

151

public Result applyStaticPartition(Map<String, String> partition);

152

public Result applyOverwrite(boolean overwrite);

153

}

154

```

155

156

[Table Source and Sink Operations](./table-source-sink.md)

157

158

### Catalog Integration

159

160

Complete Hive metastore integration providing schema discovery, table management, and metadata operations. Enables transparent access to existing Hive data warehouses.

161

162

```java { .api }

163

public class HiveCatalog extends AbstractCatalog {

164

public HiveCatalog(String catalogName, String defaultDatabase,

165

String hiveConfDir, String hadoopConfDir, String hiveVersion);

166

public List<String> listDatabases() throws DatabaseNotExistException, CatalogException;

167

public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException;

168

public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException;

169

public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);

170

public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists);

171

public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath);

172

}

173

174

public class HiveCatalogFactory implements CatalogFactory {

175

public Catalog createCatalog(Context context);

176

public String factoryIdentifier();

177

}

178

```

179

180

[Catalog Integration](./catalog-integration.md)

181

182

### Function Module Integration

183

184

Hive built-in function support enabling use of Hive UDFs within Flink SQL queries. Provides seamless function compatibility and registration.

185

186

```java { .api }

187

public class HiveModule implements Module {

188

public HiveModule();

189

public HiveModule(String hiveVersion);

190

public Set<String> listFunctions();

191

public Optional<FunctionDefinition> getFunctionDefinition(String name);

192

}

193

194

```

195

196

[Function Module Integration](./function-module.md)

197

198

### Configuration and Options

199

200

Comprehensive configuration system for customizing connector behavior, performance tuning, and environment-specific settings.

201

202

```java { .api }

203

public class HiveOptions {

204

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER;

205

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER;

206

public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;

207

public static final ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;

208

public static final ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;

209

public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;

210

public static final ConfigOption<String> SINK_PARTITION_COMMIT_POLICY_KIND;

211

}

212

213

public class HiveCatalogFactoryOptions {

214

public static final ConfigOption<String> DEFAULT_DATABASE;

215

public static final ConfigOption<String> HIVE_CONF_DIR;

216

public static final ConfigOption<String> HIVE_VERSION;

217

public static final ConfigOption<String> HADOOP_CONF_DIR;

218

}

219

```

220

221

[Configuration and Options](./configuration.md)

222

223

### Partition Management and Utilities

224

225

Advanced partition handling utilities for efficient data access, partition pruning, and metadata management in partitioned Hive tables.

226

227

```java { .api }

228

public class HiveTablePartition implements Serializable {

229

public static HiveTablePartition ofTable(StorageDescriptor storageDescriptor,

230

Map<String, String> tableParameters);

231

public static HiveTablePartition ofPartition(StorageDescriptor storageDescriptor,

232

Map<String, String> partitionSpec,

233

Map<String, String> tableParameters);

234

public StorageDescriptor getStorageDescriptor();

235

public Map<String, String> getPartitionSpec();

236

}

237

238

public class HivePartitionUtils {

239

public static List<HiveTablePartition> getAllPartitions(JobConf jobConf, String catalogName,

240

ObjectPath tablePath, List<String> partitionColNames);

241

public static byte[] serializeHiveTablePartition(List<HiveTablePartition> partitions);

242

public static List<HiveTablePartition> deserializeHiveTablePartition(byte[] bytes, ClassLoader classLoader);

243

}

244

```

245

246

[Partition Management and Utilities](./partition-management.md)

247

248

## Types

249

250

```java { .api }

251

// Core exception type

252

public class FlinkHiveException extends RuntimeException {

253

public FlinkHiveException(String message);

254

public FlinkHiveException(Throwable cause);

255

public FlinkHiveException(String message, Throwable cause);

256

}

257

258

// Configuration wrapper

259

public class JobConfWrapper implements Serializable {

260

public JobConfWrapper(JobConf jobConf);

261

public JobConf conf();

262

}

263

264

```