Apache Flink SQL connector for Apache Hive 2.3.6 with Scala 2.11 binary compatibility
—
Configuration options and factory classes for setting up Hive integration with customizable behavior for performance, compatibility, and operational requirements.
Configuration options for tuning Hive connector behavior and performance.
/**
* Configuration options for Hive connector operations
*/
public class HiveOptions {
/**
* Whether to use Hadoop MapRed record reader for ORC files
* Default: false
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER =
ConfigOptions.key("table.exec.hive.fallback-mapred-reader")
.defaultValue(false)
.withDescription(
"If it is false, using flink native vectorized reader to read orc files; " +
"If it is true, using hadoop mapred record reader to read orc files.");
/**
* Whether to infer source parallelism based on splits
* Default: true
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM =
ConfigOptions.key("table.exec.hive.infer-source-parallelism")
.defaultValue(true)
.withDescription(
"If is false, parallelism of source are set by config.\n" +
"If is true, source parallelism is inferred according to splits number.\n");
/**
* Maximum inferred parallelism for source operator
* Default: 1000
*/
public static final ConfigOption<Integer> TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX =
ConfigOptions.key("table.exec.hive.infer-source-parallelism.max")
.defaultValue(1000)
.withDescription("Sets max infer parallelism for source operator.");
/**
* Whether to use Hadoop MapRed record writer for Parquet and ORC files
* Default: true
*/
public static final ConfigOption<Boolean> TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER =
ConfigOptions.key("table.exec.hive.fallback-mapred-writer")
.booleanType()
.defaultValue(true)
.withDescription(
"If it is false, using flink native writer to write parquet and orc files; " +
"If it is true, using hadoop mapred record writer to write parquet and orc files.");
}Factory and configuration options for creating Hive catalog instances.
/**
* Factory for creating HiveCatalog instances with configuration validation
*/
public class HiveCatalogFactory implements CatalogFactory {
/**
* Get the factory identifier for service discovery
* @return "hive" identifier string
*/
public String factoryIdentifier();
/**
* Create HiveCatalog from configuration context
* @param context - Factory context with configuration options
* @return Configured HiveCatalog instance
*/
public Catalog createCatalog(Context context);
/**
* Get required configuration options
* @return Set of required ConfigOption objects (empty for Hive)
*/
public Set<ConfigOption<?>> requiredOptions();
/**
* Get optional configuration options
* @return Set of optional ConfigOption objects
*/
public Set<ConfigOption<?>> optionalOptions();
}
/**
* Configuration options for HiveCatalogFactory
*/
public class HiveCatalogFactoryOptions {
/**
* Factory identifier for service discovery
*/
public static final String IDENTIFIER = "hive";
/**
* Default database name for the catalog
* Default: "default"
*/
public static final ConfigOption<String> DEFAULT_DATABASE =
ConfigOptions.key("default-database")
.stringType()
.defaultValue("default")
.withDescription("Default database name for the catalog.");
/**
* Directory containing hive-site.xml configuration file
* Default: null (uses classpath)
*/
public static final ConfigOption<String> HIVE_CONF_DIR =
ConfigOptions.key("hive-conf-dir")
.stringType()
.noDefaultValue()
.withDescription("Directory containing hive-site.xml configuration file.");
/**
* Directory containing Hadoop configuration files
* Default: null (uses classpath)
*/
public static final ConfigOption<String> HADOOP_CONF_DIR =
ConfigOptions.key("hadoop-conf-dir")
.stringType()
.noDefaultValue()
.withDescription("Directory containing Hadoop configuration files.");
/**
* Hive version string for compatibility
* Default: null (auto-detected)
*/
public static final ConfigOption<String> HIVE_VERSION =
ConfigOptions.key("hive-version")
.stringType()
.noDefaultValue()
.withDescription("Hive version string for compatibility.");
}Factory and configuration for HiveModule creation.
/**
* Factory for creating HiveModule instances
*/
public class HiveModuleFactory implements ModuleFactory {
/**
* Get the factory identifier
* @return "hive" identifier string
*/
public String factoryIdentifier();
/**
* Create HiveModule from configuration context
* @param context - Factory context with configuration options
* @return Configured HiveModule instance
*/
public Module createModule(Context context);
/**
* Get required configuration options
* @return Set of required ConfigOption objects (empty for Hive module)
*/
public Set<ConfigOption<?>> requiredOptions();
/**
* Get optional configuration options
* @return Set of optional ConfigOption objects
*/
public Set<ConfigOption<?>> optionalOptions();
}
/**
* Configuration options for HiveModule
*/
public class HiveModuleOptions {
/**
* Hive version for function compatibility
* Default: null (uses latest supported)
*/
public static final ConfigOption<String> HIVE_VERSION =
ConfigOptions.key("hive-version")
.stringType()
.noDefaultValue()
.withDescription("Hive version for function compatibility.");
}Configuration options specific to streaming Hive sources.
/**
* Configuration options from FileSystemConnectorOptions used by Hive connector
*/
public class FileSystemConnectorOptions {
/**
* Enable streaming source mode for partition monitoring
* Default: false
*/
public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
ConfigOptions.key("streaming-source.enable")
.booleanType()
.defaultValue(false)
.withDescription("Enable streaming source mode for partition monitoring.");
/**
* Which partitions to include in streaming mode
* Options: "all", "latest"
* Default: "all"
*/
public static final ConfigOption<String> STREAMING_SOURCE_PARTITION_INCLUDE =
ConfigOptions.key("streaming-source.partition.include")
.stringType()
.defaultValue("all")
.withDescription("Which partitions to include: 'all' or 'latest'.");
/**
* Interval for monitoring new partitions (in milliseconds)
* Default: 60000 (1 minute)
*/
public static final ConfigOption<Long> STREAMING_SOURCE_MONITOR_INTERVAL =
ConfigOptions.key("streaming-source.monitor-interval")
.longType()
.defaultValue(60000L)
.withDescription("Interval for monitoring new partitions in milliseconds.");
/**
* Configured parallelism for sink operations
* Default: null (uses default parallelism)
*/
public static final ConfigOption<Integer> SINK_PARALLELISM =
ConfigOptions.key("sink.parallelism")
.intType()
.noDefaultValue()
.withDescription("Configured parallelism for sink operations.");
}Utility classes for managing Hadoop and Hive configuration.
/**
* Utilities for managing Hive configuration
*/
public class HiveConfUtils {
/**
* Create HiveConf from configuration directory
* @param hiveConfDir - Directory containing hive-site.xml (can be null)
* @return Configured HiveConf instance
*/
public static HiveConf create(String hiveConfDir);
/**
* Get Hive configuration with custom properties
* @param hiveConf - Base Hive configuration
* @param customProps - Additional properties to set
* @return Updated HiveConf instance
*/
public static HiveConf create(HiveConf hiveConf, Map<String, String> customProps);
}
/**
* Utilities for managing Hadoop JobConf
*/
public class JobConfUtils {
/**
* Create JobConf with security credentials
* @param hiveConf - Hive configuration to base JobConf on
* @return JobConf with security credentials configured
*/
public static JobConf createJobConfWithCredentials(HiveConf hiveConf);
/**
* Create JobConf with custom properties
* @param hiveConf - Base Hive configuration
* @param extraConf - Additional configuration properties
* @return Configured JobConf instance
*/
public static JobConf createJobConf(HiveConf hiveConf, Map<String, String> extraConf);
}
/**
* Factory for creating Hadoop FileSystem instances
*/
public class HadoopFileSystemFactory {
/**
* Create file system factory with configuration
* @param hadoopConf - Hadoop configuration
*/
public HadoopFileSystemFactory(org.apache.hadoop.conf.Configuration hadoopConf);
/**
* Create file system for given URI
* @param fsUri - File system URI
* @return FileSystem instance
* @throws IOException if creation fails
*/
public FileSystem create(URI fsUri) throws IOException;
}Configuration for the dynamic table factory system.
/**
* Configuration options for dynamic table operations
*/
public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
/**
* Get factory identifier (throws UnsupportedOperationException)
* Hive factory only works through catalog, not standalone
* @return Not supported
* @throws UnsupportedOperationException always
*/
public String factoryIdentifier();
/**
* Get required options (throws UnsupportedOperationException)
* @return Not supported
* @throws UnsupportedOperationException always
*/
public Set<ConfigOption<?>> requiredOptions();
/**
* Get optional options (throws UnsupportedOperationException)
* @return Not supported
* @throws UnsupportedOperationException always
*/
public Set<ConfigOption<?>> optionalOptions();
/**
* Create dynamic table source based on context
* @param context - Creation context with catalog table info
* @return DynamicTableSource implementation
*/
public DynamicTableSource createDynamicTableSource(Context context);
/**
* Create dynamic table sink based on context
* @param context - Creation context with catalog table info
* @return DynamicTableSink implementation
*/
public DynamicTableSink createDynamicTableSink(Context context);
}Usage Examples:
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connectors.hive.HiveOptions;
// Configure Hive connector options
Configuration config = new Configuration();
// Use native Flink readers for better performance
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER, false);
// Use native Flink writers for better performance
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_WRITER, false);
// Enable source parallelism inference
config.setBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM, true);
config.setInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX, 500);
// Create table environment with configuration
TableEnvironment tableEnv = TableEnvironment.create(
EnvironmentSettings.newInstance()
.inBatchMode()
.withConfiguration(config)
.build()
);// Create Hive catalog with full configuration options
Map<String, String> catalogOptions = new HashMap<>();
catalogOptions.put("type", "hive");
catalogOptions.put("default-database", "analytics");
catalogOptions.put("hive-conf-dir", "/opt/hive/conf");
catalogOptions.put("hadoop-conf-dir", "/opt/hadoop/etc/hadoop");
catalogOptions.put("hive-version", "2.3.6");
// Use catalog factory to create catalog
CatalogFactory.Context context = new CatalogFactory.Context() {
public String getName() { return "hive_catalog"; }
public Map<String, String> getOptions() { return catalogOptions; }
public ReadableConfig getConfiguration() { return Configuration.fromMap(catalogOptions); }
public ClassLoader getClassLoader() { return Thread.currentThread().getContextClassLoader(); }
};
HiveCatalogFactory factory = new HiveCatalogFactory();
Catalog hiveCatalog = factory.createCatalog(context);
tableEnv.registerCatalog("hive_catalog", hiveCatalog);
tableEnv.useCatalog("hive_catalog");// Configure streaming Hive source
tableEnv.executeSql(
"CREATE TABLE streaming_events (" +
" event_id BIGINT," +
" user_id BIGINT," +
" event_time TIMESTAMP(3)," +
" event_type STRING," +
" partition_date STRING" +
") PARTITIONED BY (partition_date) " +
"STORED AS PARQUET " +
"TBLPROPERTIES (" +
" 'streaming-source.enable' = 'true'," +
" 'streaming-source.partition.include' = 'all'," +
" 'streaming-source.monitor-interval' = '30000'" + // 30 seconds
")"
);
// Query streaming table
Table result = tableEnv.sqlQuery(
"SELECT event_type, COUNT(*) as event_count " +
"FROM streaming_events " +
"WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '1' HOUR " +
"GROUP BY event_type"
);// Load Hive module with specific version
Map<String, String> moduleOptions = new HashMap<>();
moduleOptions.put("hive-version", "2.3.6");
ModuleFactory.Context moduleContext = new ModuleFactory.Context() {
public Map<String, String> getOptions() { return moduleOptions; }
public ReadableConfig getConfiguration() { return Configuration.fromMap(moduleOptions); }
public ClassLoader getClassLoader() { return Thread.currentThread().getContextClassLoader(); }
};
HiveModuleFactory moduleFactory = new HiveModuleFactory();
Module hiveModule = moduleFactory.createModule(moduleContext);
tableEnv.loadModule("hive", hiveModule);public interface ConfigOption<T> {
/**
* Get the option key
* @return Configuration key string
*/
String key();
/**
* Get the default value
* @return Default value for this option
*/
T defaultValue();
/**
* Get the option description
* @return Human-readable description
*/
String description();
}
public interface CatalogFactory extends Factory {
/**
* Create catalog from context
* @param context - Creation context
* @return Catalog instance
*/
Catalog createCatalog(Context context);
/**
* Context interface for catalog creation
*/
interface Context {
String getName();
Map<String, String> getOptions();
ReadableConfig getConfiguration();
ClassLoader getClassLoader();
}
}
public interface ModuleFactory extends Factory {
/**
* Create module from context
* @param context - Creation context
* @return Module instance
*/
Module createModule(Context context);
/**
* Context interface for module creation
*/
interface Context {
Map<String, String> getOptions();
ReadableConfig getConfiguration();
ClassLoader getClassLoader();
}
}
public interface Factory {
/**
* Get unique factory identifier
* @return Factory identifier string
*/
String factoryIdentifier();
/**
* Get required configuration options
* @return Set of required options
*/
Set<ConfigOption<?>> requiredOptions();
/**
* Get optional configuration options
* @return Set of optional options
*/
Set<ConfigOption<?>> optionalOptions();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6-2-11