Apache Flink SQL connector for Apache Hive 2.3.6 with Scala 2.11 binary compatibility
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6-2-11@1.14.0Apache Flink SQL connector for Apache Hive 2.3.6 enables seamless integration between Apache Flink's streaming and batch processing capabilities and Hive's data warehousing infrastructure. This connector provides comprehensive support for reading from and writing to Hive tables using Flink SQL queries, complete with catalog integration and Hive function support.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-2.3.6_2.11</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.connectors.hive.HiveDynamicTableFactory;
import org.apache.flink.connectors.hive.HiveTableSource;
import org.apache.flink.connectors.hive.HiveTableSink;
import org.apache.flink.table.module.hive.HiveModule;import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;
// Create Hive catalog
HiveCatalog hiveCatalog = new HiveCatalog(
"hive_catalog", // catalog name
"default", // default database
"/path/to/hive-site.xml", // hive conf dir
"/path/to/hadoop/conf", // hadoop conf dir
"2.3.6" // hive version
);
// Register catalog with table environment
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
tableEnv.registerCatalog("hive_catalog", hiveCatalog);
tableEnv.useCatalog("hive_catalog");
// Load Hive module for UDF support
tableEnv.loadModule("hive", new HiveModule("2.3.6"));
// Use Hive tables with SQL
tableEnv.executeSql("SELECT * FROM hive_table WHERE partition_key = 'value'");The Apache Flink Hive connector is built around several key components:
HiveCatalog provides full integration with Hive metastore for database and table metadata managementHiveTableSource and HiveTableSink handle data reading and writing with support for various file formatsHiveDynamicTableFactory creates table sources and sinks based on catalog metadataHiveModule provides access to Hive built-in functions (UDF/UDAF/UDTF)HiveShim implementations ensure compatibility across Hive versions 1.0.0 to 3.1.2HiveParser enables Hive dialect SQL parsing for enhanced compatibilityComplete Hive metastore integration for managing databases, tables, partitions, and metadata. Supports all standard catalog operations with full compatibility.
public class HiveCatalog extends AbstractCatalog {
public HiveCatalog(String catalogName, String defaultDatabase, String hiveConfDir, String hadoopConfDir, String hiveVersion);
public void open() throws CatalogException;
public void close() throws CatalogException;
public List<String> listDatabases() throws CatalogException;
public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException;
public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException;
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException;
}Reading data from Hive tables with support for both batch and streaming modes, partition pruning, projection pushdown, and lookup joins.
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown, SupportsProjectionPushDown, SupportsLimitPushDown {
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
public void applyLimit(long limit);
public boolean supportsNestedProjection();
public void applyProjection(int[][] projectedFields);
public void applyPartitions(List<Map<String, String>> remainingPartitions);
}
public class HiveLookupTableSource implements LookupTableSource, ScanTableSource {
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
}Writing data to Hive tables with support for partitioning, multiple file formats, and streaming ingestion with compaction.
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
public DynamicTableSink applyStaticPartition(Map<String, String> partition);
public boolean requiresPartitionGrouping(boolean supportsGrouping);
}Access to Hive built-in functions including UDF, UDAF, and UDTF through the HiveModule system with version-specific compatibility.
public class HiveModule implements Module {
public HiveModule(String hiveVersion);
public Set<String> listFunctions();
public Optional<FunctionDefinition> getFunctionDefinition(String name);
}
public interface HiveFunction {
// Marker interface for Hive function wrappers
}
public class HiveGenericUDF extends ScalarFunction implements HiveFunction {
public HiveGenericUDF(HiveFunctionWrapper<GenericUDF> hiveFunctionWrapper, HiveShim hiveShim);
}New Source API implementation for Hive tables providing enhanced control over split enumeration and reading with support for continuous partition monitoring.
public class HiveSource<T> implements Source<T, HiveSourceSplit, ContinuousHivePendingSplitsCheckpoint> {
public SourceReader<T, HiveSourceSplit> createReader(SourceReaderContext readerContext);
public SplitEnumerator<HiveSourceSplit, ContinuousHivePendingSplitsCheckpoint> createEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext);
public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();
public SimpleVersionedSerializer<ContinuousHivePendingSplitsCheckpoint> getEnumeratorCheckpointSerializer();
}Configuration options and factory classes for setting up Hive integration with customizable behavior for performance and compatibility.
public class HiveOptions {
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_LOAD_PARTITION_SPLITS_THREAD_NUM;
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_READ_PARTITION_WITH_SUBDIRECTORY_ENABLED;
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_NATIVE_AGG_FUNCTION_ENABLED;
}
public class HiveCatalogFactory implements CatalogFactory {
public String factoryIdentifier();
public Catalog createCatalog(Context context);
public Set<ConfigOption<?>> requiredOptions();
public Set<ConfigOption<?>> optionalOptions();
}public class HiveTablePartition {
public HiveTablePartition(StorageDescriptor storageDescriptor, Map<String, String> partitionSpec);
public StorageDescriptor getStorageDescriptor();
public Map<String, String> getPartitionSpec();
}
public class FlinkHiveException extends RuntimeException {
public FlinkHiveException(String message);
public FlinkHiveException(String message, Throwable cause);
}
public interface HiveShim {
// Version-specific Hive compatibility interface
}public class HiveSourceSplit implements SourceSplit {
public String splitId();
// Split information for Hive table reading
}
public class ContinuousHivePendingSplitsCheckpoint {
// Checkpoint information for continuous Hive monitoring
}