High-level Table API integration providing HiveTableSource and HiveTableSink for seamless SQL access to Hive tables with pushdown optimizations.
Streaming and batch source for Hive tables in Table API with support for predicate pushdown, projection pushdown, and partition pruning optimizations.
/**
* Streaming source for Hive tables in Table API
* Implements multiple pushdown interfaces for query optimization
*/
class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,
SupportsProjectionPushDown, SupportsLimitPushDown,
SupportsWatermarkPushDown {
// Created automatically when accessing Hive tables through registered catalog
// Supports both streaming and batch execution modes
// Provides automatic schema inference from Hive metastore
}
/**
* Partition pushdown optimization interface
* Allows Flink to prune partitions at planning time
*/
interface SupportsPartitionPushDown {
/**
* Apply partition pruning based on query predicates
* @param remainingPartitions - List of partition specifications that match query predicates
* @return Result indicating whether pushdown was applied
*/
Result applyPartitions(List<Map<String, String>> remainingPartitions);
}
/**
* Projection pushdown optimization interface
* Allows reading only required columns from source
*/
interface SupportsProjectionPushDown {
/**
* Check if nested field projection is supported
* @return true if nested projection is supported
*/
boolean supportsNestedProjection();
/**
* Apply column projection to reduce data reading
* @param projectedFields - Array of field indices to project, supports nested fields
*/
void applyProjection(int[][] projectedFields);
}
/**
* Limit pushdown optimization interface
* Allows applying LIMIT clause at source level
*/
interface SupportsLimitPushDown {
/**
* Apply limit to reduce amount of data read from source
* @param limit - Maximum number of rows to read
*/
void applyLimit(long limit);
}Usage Examples:
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
// Register Hive catalog
TableEnvironment tableEnv = TableEnvironment.create(settings);
HiveCatalog hiveCatalog = new HiveCatalog("myhive", "default", hiveConfDir, hadoopConfDir, "2.3.6");
tableEnv.registerCatalog("myhive", hiveCatalog);
tableEnv.useCatalog("myhive");
// Use Hive tables with automatic optimizations
// Partition pruning example
tableEnv.executeSql(
"SELECT customer_id, order_total " +
"FROM sales_data " +
"WHERE partition_date >= '2023-12-01' AND partition_date < '2024-01-01'"
).print();
// Projection pushdown example (only reads specified columns)
tableEnv.executeSql(
"SELECT customer_name, email " +
"FROM customer_profiles " +
"WHERE status = 'active'"
).print();
// Limit pushdown example
tableEnv.executeSql(
"SELECT * FROM large_table " +
"ORDER BY created_time DESC " +
"LIMIT 1000"
).print();
// Complex query with multiple optimizations
tableEnv.executeSql(
"SELECT c.customer_name, s.total_amount " +
"FROM customer_profiles c " +
"JOIN sales_summary s ON c.customer_id = s.customer_id " +
"WHERE c.region = 'North America' " +
" AND s.sale_date >= '2023-01-01' " +
"ORDER BY s.total_amount DESC " +
"LIMIT 100"
).print();Sink for writing data to Hive tables with support for overwriting, partitioning, and various output formats.
/**
* Sink for writing to Hive tables
* Supports partitioned and non-partitioned tables with multiple write modes
*/
class HiveTableSink implements DynamicTableSink, SupportsOverwrite, SupportsPartitioning {
// Created automatically when writing to Hive tables through registered catalog
// Supports both streaming and batch writing modes
// Handles automatic file format selection based on table definition
}
/**
* Overwrite support interface
* Allows overwriting existing table data or specific partitions
*/
interface SupportsOverwrite {
/**
* Apply overwrite mode for the sink
* @param overwriteSpec - Map specifying which partitions to overwrite
*/
void applyOverwrite(Map<String, String> overwriteSpec);
}
/**
* Partitioning support interface
* Enables writing to partitioned tables with automatic partition creation
*/
interface SupportsPartitioning {
/**
* Apply static partition specification
* @param partition - Map of partition key-value pairs
*/
void applyStaticPartition(Map<String, String> partition);
}Usage Examples:
// Insert data into Hive table
tableEnv.executeSql(
"INSERT INTO sales_data " +
"SELECT customer_id, product_id, amount, order_date, 'processed' as status " +
"FROM raw_orders " +
"WHERE order_date >= CURRENT_DATE"
);
// Insert with partition specification
tableEnv.executeSql(
"INSERT INTO partitioned_sales " +
"PARTITION (year='2023', month='12') " +
"SELECT customer_id, product_id, amount, order_date " +
"FROM daily_orders " +
"WHERE order_date >= '2023-12-01'"
);
// Overwrite specific partition
tableEnv.executeSql(
"INSERT OVERWRITE partitioned_sales " +
"PARTITION (year='2023', month='12') " +
"SELECT customer_id, product_id, amount, order_date " +
"FROM corrected_orders " +
"WHERE order_date BETWEEN '2023-12-01' AND '2023-12-31'"
);
// Dynamic partitioning (partitions determined by data)
tableEnv.executeSql(
"INSERT INTO sales_by_region " +
"SELECT customer_id, product_id, amount, order_date, region " +
"FROM enriched_orders"
);
// Create table as select (CTAS)
tableEnv.executeSql(
"CREATE TABLE customer_summary " +
"PARTITIONED BY (region STRING) " +
"STORED AS PARQUET " +
"AS SELECT " +
" customer_id, " +
" customer_name, " +
" SUM(order_total) as total_spent, " +
" COUNT(*) as order_count, " +
" region " +
"FROM customer_orders " +
"GROUP BY customer_id, customer_name, region"
);Lookup table source for performing lookup joins with Hive tables, supporting caching and efficient point queries.
/**
* Provides lookup join capability for Hive tables
* Enables efficient point queries for enriching streaming data
*/
class HiveLookupTableSource implements LookupTableSource {
/**
* Get lookup function for performing point queries
* @param context - Lookup context with key information and caching options
* @return TableFunction for performing lookup operations
*/
TableFunction<RowData> getLookupFunction(LookupContext context);
/**
* Get async lookup function for non-blocking point queries
* @param context - Lookup context with key information and caching options
* @return AsyncTableFunction for performing async lookup operations
*/
AsyncTableFunction<RowData> getAsyncLookupFunction(LookupContext context);
}Usage Examples:
// Lookup join example in SQL
tableEnv.executeSql(
"CREATE TABLE orders (" +
" order_id BIGINT," +
" customer_id BIGINT," +
" product_id BIGINT," +
" amount DECIMAL(10,2)," +
" order_time TIMESTAMP(3)," +
" WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'orders'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")"
);
// Register Hive table for lookup
// (assuming customer_profiles table exists in registered Hive catalog)
// Perform lookup join
tableEnv.executeSql(
"SELECT " +
" o.order_id," +
" o.customer_id," +
" c.customer_name," +
" c.customer_email," +
" o.amount," +
" o.order_time " +
"FROM orders o " +
"LEFT JOIN customer_profiles FOR SYSTEM_TIME AS OF o.order_time AS c " +
" ON o.customer_id = c.customer_id"
).print();
// Configure lookup caching
Configuration config = new Configuration();
config.set(HiveOptions.LOOKUP_JOIN_CACHE_TTL, Duration.ofMinutes(30));
tableEnv.getConfig().addConfiguration(config);Modern dynamic table factory for creating HiveTableSource and HiveTableSink instances automatically from catalog metadata.
/**
* Modern dynamic table factory creating HiveTableSource and HiveTableSink instances for Table API
* Used internally by Flink when accessing Hive tables through registered catalogs
*/
class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
/**
* Create HiveTableSource from table context
* @param context - Context containing table schema, options, and catalog information
* @return Configured HiveTableSource instance
*/
DynamicTableSource createDynamicTableSource(Context context);
/**
* Create HiveTableSink from table context
* @param context - Context containing table schema, options, and catalog information
* @return Configured HiveTableSink instance
*/
DynamicTableSink createDynamicTableSink(Context context);
/**
* Get factory identifier
* @return "hive" - the factory identifier
*/
String factoryIdentifier();
}Create Hive tables directly from Flink SQL:
-- Create non-partitioned table
CREATE TABLE customer_data (
customer_id BIGINT,
customer_name STRING,
email STRING,
registration_date DATE
) WITH (
'connector' = 'hive',
'table-name' = 'customer_data'
);
-- Create partitioned table
CREATE TABLE sales_data (
order_id BIGINT,
customer_id BIGINT,
amount DECIMAL(10,2),
order_date DATE
) PARTITIONED BY (
year INT,
month INT
) WITH (
'connector' = 'hive',
'table-name' = 'sales_data',
'partition.time-extractor.timestamp-pattern' = '$year-$month-01 00:00:00',
'sink.partition-commit.trigger' = 'partition-time',
'sink.partition-commit.delay' = '1 h',
'sink.partition-commit.policy.kind' = 'metastore,success-file'
);
-- Create table with specific file format
CREATE TABLE parquet_data (
id BIGINT,
name STRING,
value DOUBLE
) WITH (
'connector' = 'hive',
'table-name' = 'parquet_data',
'format' = 'parquet'
);The Table API integration provides several optimization features: