or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6_2-12

Apache Flink SQL connector for Apache Hive 2.3.6 providing integration between Flink and Hive for reading/writing Hive tables and using Hive Metastore as a catalog.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-sql-connector-hive-2.3.6_2.12@1.15.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6_2-12@1.15.0

0

# Apache Flink Hive Connector 2.3.6

1

2

Apache Flink SQL connector for Apache Hive 2.3.6 that provides comprehensive integration between Flink and Hive, enabling both batch and streaming access to Hive tables, Hive catalog integration, and Hive function support. This connector serves as a bridge between Flink's unified stream and batch processing capabilities and the Hive data warehouse ecosystem.

3

4

## Package Information

5

6

- **Package Name**: flink-sql-connector-hive-2.3.6_2.12

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Installation**: Add to Maven dependencies:

10

11

```xml

12

<dependency>

13

<groupId>org.apache.flink</groupId>

14

<artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>

15

<version>1.15.4</version>

16

</dependency>

17

```

18

19

For runtime classpath (if not using uber JAR):

20

```bash

21

# Download and place in Flink lib directory

22

cp flink-sql-connector-hive-2.3.6_2.12-1.15.4.jar $FLINK_HOME/lib/

23

```

24

25

## Core Imports

26

27

```java

28

// Catalog integration

29

import org.apache.flink.table.catalog.hive.HiveCatalog;

30

import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory;

31

32

// DataStream API source

33

import org.apache.flink.connectors.hive.HiveSource;

34

import org.apache.flink.connectors.hive.HiveSourceBuilder;

35

import org.apache.flink.connectors.hive.HiveTablePartition;

36

37

// Table API integration

38

import org.apache.flink.connectors.hive.HiveTableSource;

39

import org.apache.flink.connectors.hive.HiveTableSink;

40

import org.apache.flink.connectors.hive.HiveLookupTableSource;

41

import org.apache.flink.connectors.hive.HiveDynamicTableFactory;

42

43

// Module for Hive functions

44

import org.apache.flink.table.module.hive.HiveModule;

45

import org.apache.flink.table.module.hive.HiveModuleOptions;

46

47

// Configuration options

48

import org.apache.flink.connectors.hive.HiveOptions;

49

50

// Exception handling

51

import org.apache.flink.connectors.hive.FlinkHiveException;

52

53

// Annotations for nullability

54

import javax.annotation.Nullable;

55

import javax.annotation.Nonnull;

56

```

57

58

## Basic Usage

59

60

### Catalog Integration

61

62

```java

63

import org.apache.flink.table.api.EnvironmentSettings;

64

import org.apache.flink.table.api.TableEnvironment;

65

import org.apache.flink.table.catalog.hive.HiveCatalog;

66

67

// Create table environment

68

EnvironmentSettings settings = EnvironmentSettings

69

.newInstance()

70

.inBatchMode()

71

.build();

72

TableEnvironment tableEnv = TableEnvironment.create(settings);

73

74

// Create and register Hive catalog

75

String catalogName = "myhive";

76

String defaultDatabase = "default";

77

String hiveConfDir = "/opt/hive-conf";

78

String hadoopConfDir = "/opt/hadoop-conf";

79

String hiveVersion = "2.3.6";

80

81

HiveCatalog hive = new HiveCatalog(

82

catalogName, defaultDatabase, hiveConfDir, hadoopConfDir, hiveVersion);

83

tableEnv.registerCatalog(catalogName, hive);

84

tableEnv.useCatalog(catalogName);

85

86

// Use Hive tables with SQL

87

tableEnv.executeSql("SELECT * FROM my_hive_table").print();

88

```

89

90

### DataStream API Usage

91

92

```java

93

import org.apache.flink.connectors.hive.HiveSourceBuilder;

94

import org.apache.flink.connectors.hive.HiveTablePartition;

95

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

96

import org.apache.hadoop.conf.Configuration;

97

98

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

99

100

// Create job configuration for Hive

101

Configuration jobConf = new Configuration();

102

// Set Hive metastore URI and other configs

103

jobConf.set("hive.metastore.uris", "thrift://localhost:9083");

104

105

// Build Hive source

106

HiveSource<RowData> source = new HiveSourceBuilder(

107

jobConf,

108

env.getConfiguration(),

109

"2.3.6",

110

"default",

111

"my_table",

112

Collections.emptyMap()

113

).buildWithDefaultBulkFormat();

114

115

// Add source to stream

116

env.fromSource(source, WatermarkStrategy.noWatermarks(), "hive-source")

117

.print();

118

119

env.execute("Hive Stream Job");

120

```

121

122

### Module Registration for Hive Functions

123

124

```java

125

import org.apache.flink.table.module.hive.HiveModule;

126

127

// Register Hive module to access Hive built-in functions

128

tableEnv.loadModule("hive", new HiveModule("2.3.6"));

129

130

// Use Hive functions in SQL

131

tableEnv.executeSql("SELECT concat('Hello', ' ', 'World')").print();

132

```

133

134

## Architecture

135

136

The Flink Hive Connector is built around several key components:

137

138

- **Catalog Integration**: `HiveCatalog` provides full metastore integration for persistent table metadata

139

- **Source/Sink Components**: `HiveSource`, `HiveTableSource`, and `HiveTableSink` for data access

140

- **Module System**: `HiveModule` enables access to Hive built-in functions within Flink SQL

141

- **Configuration Management**: Comprehensive options for tuning performance and behavior

142

- **Version Abstraction**: Shim system supporting multiple Hive versions with consistent API

143

- **File Format Support**: Works with Parquet, ORC, text files, and other Hadoop-compatible formats

144

145

## Capabilities

146

147

### Catalog Integration

148

149

Complete Hive metastore integration allowing Flink to use Hive as a persistent catalog for storing table definitions, schemas, and metadata across sessions.

150

151

```java { .api }

152

HiveCatalog(String catalogName, @Nullable String defaultDatabase, @Nullable String hiveConfDir,

153

@Nullable String hadoopConfDir, @Nullable String hiveVersion);

154

155

// Database operations

156

void createDatabase(CatalogDatabase database, boolean ignoreIfExists);

157

CatalogDatabase getDatabase(String databaseName);

158

List<String> listDatabases();

159

160

// Table operations

161

void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists);

162

CatalogBaseTable getTable(ObjectPath tablePath);

163

List<String> listTables(String databaseName);

164

```

165

166

[Catalog Integration](./catalog.md)

167

168

### DataStream Source

169

170

Low-level streaming and batch source for reading Hive tables directly in DataStream API programs with full control over parallelism, partitioning, and data formats.

171

172

```java { .api }

173

class HiveSourceBuilder {

174

HiveSourceBuilder(@Nonnull JobConf jobConf, @Nonnull ReadableConfig flinkConf, @Nullable String hiveVersion,

175

@Nonnull String dbName, @Nonnull String tableName, @Nonnull Map<String, String> tableOptions);

176

177

HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);

178

HiveSourceBuilder setLimit(@Nullable Long limit);

179

HiveSourceBuilder setProjectedFields(int[] projectedFields);

180

181

HiveSource<RowData> buildWithDefaultBulkFormat();

182

<T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);

183

}

184

185

class HiveTablePartition {

186

static HiveTablePartition ofTable(HiveConf hiveConf, @Nullable String hiveVersion,

187

String dbName, String tableName);

188

static HiveTablePartition ofPartition(HiveConf hiveConf, @Nullable String hiveVersion,

189

String dbName, String tableName,

190

LinkedHashMap<String, String> partitionSpec);

191

}

192

```

193

194

[DataStream Source](./datastream-source.md)

195

196

### Table API Integration

197

198

High-level Table API integration providing `HiveTableSource` and `HiveTableSink` for seamless SQL access to Hive tables with pushdown optimizations.

199

200

```java { .api }

201

// Created automatically via catalog registration

202

// Supports predicate pushdown, projection pushdown, partition pruning

203

interface SupportsPartitionPushDown {

204

Result applyPartitions(List<Map<String, String>> remainingPartitions);

205

}

206

207

interface SupportsProjectionPushDown {

208

boolean supportsNestedProjection();

209

void applyProjection(int[][] projectedFields);

210

}

211

212

interface SupportsLimitPushDown {

213

void applyLimit(long limit);

214

}

215

```

216

217

[Table API Integration](./table-api.md)

218

219

### Hive Functions

220

221

Module system integration enabling access to Hive built-in functions within Flink SQL, including string functions, date functions, and mathematical operations.

222

223

```java { .api }

224

class HiveModule implements Module {

225

HiveModule();

226

HiveModule(String hiveVersion);

227

228

Set<String> listFunctions();

229

Optional<FunctionDefinition> getFunctionDefinition(String name);

230

String getHiveVersion();

231

}

232

```

233

234

[Hive Functions](./hive-functions.md)

235

236

### Configuration Options

237

238

Comprehensive configuration system for tuning connector behavior, performance optimization, and streaming source configuration.

239

240

```java { .api }

241

class HiveOptions {

242

// Performance tuning

243

ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM;

244

ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX;

245

246

// Streaming source options

247

ConfigOption<Boolean> STREAMING_SOURCE_ENABLE;

248

ConfigOption<Duration> STREAMING_SOURCE_MONITOR_INTERVAL;

249

ConfigOption<String> STREAMING_SOURCE_CONSUME_START_OFFSET;

250

251

// Lookup join caching

252

ConfigOption<Duration> LOOKUP_JOIN_CACHE_TTL;

253

}

254

255

enum PartitionOrder {

256

CREATE_TIME, PARTITION_TIME, PARTITION_NAME

257

}

258

```

259

260

[Configuration Options](./configuration.md)

261

262

## Types

263

264

```java { .api }

265

class FlinkHiveException extends RuntimeException {

266

FlinkHiveException(String message);

267

FlinkHiveException(Throwable cause);

268

FlinkHiveException(String message, Throwable cause);

269

}

270

271

class HiveSourceSplit extends FileSourceSplit {

272

// Represents a split of Hive data for parallel processing

273

}

274

275

interface HiveFunction {

276

void setArgumentTypesAndConstants(Object[] constantArguments, DataType[] argTypes);

277

DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes);

278

}

279

```