High-level Table API integration for reading from and writing to Hive tables with advanced optimizations including partition pruning, projection pushdown, limit pushdown, and streaming support.
Table source for reading data from Hive tables with comprehensive optimization support.
/**
* Table source for reading data from Hive tables with optimization support
*/
public class HiveTableSource implements ScanTableSource, SupportsPartitionPushDown,
SupportsProjectionPushDown, SupportsLimitPushDown {
/** Get runtime provider for scanning operations */
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
/** Get supported changelog mode */
public ChangelogMode getChangelogMode();
/** Apply partition pruning optimization */
public void applyPartitions(List<Map<String, String>> remainingPartitions);
/** Apply column projection optimization */
public void applyProjection(int[][] projectedFields);
/** Apply limit pushdown optimization */
public void applyLimit(long limit);
/** Create copy of the source */
public DynamicTableSource copy();
/** Get source summary for planning */
public String asSummaryString();
}Usage Examples:
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.Table;
// Register Hive catalog
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.useCatalog("hive");
// Read from Hive table with SQL
Table result = tableEnv.sqlQuery("SELECT id, name FROM my_table WHERE year = '2023'");
// Read with partition pruning
Table filtered = tableEnv.sqlQuery(
"SELECT * FROM partitioned_table WHERE partition_col = 'value'"
);
// Read with column projection and limit
Table limited = tableEnv.sqlQuery(
"SELECT col1, col2 FROM large_table LIMIT 1000"
);Table sink for writing data to Hive tables with partitioning and overwrite support.
/**
* Table sink for writing data to Hive tables with partitioning support
*/
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
/** Get runtime provider for writing operations */
public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
/** Apply static partitioning */
public void applyStaticPartition(Map<String, String> partition);
/** Apply overwrite mode */
public void applyOverwrite(boolean overwrite);
/** Check if partition grouping is required */
public boolean requiresPartitionGrouping(boolean supportsGrouping);
/** Create copy of the sink */
public DynamicTableSink copy();
/** Get sink summary for planning */
public String asSummaryString();
}Usage Examples:
// Write to Hive table with SQL
tableEnv.executeSql("INSERT INTO my_table SELECT * FROM source_table");
// Write with static partitioning
tableEnv.executeSql(
"INSERT INTO partitioned_table PARTITION (year = '2023', month = '01') " +
"SELECT id, name FROM source_table"
);
// Overwrite existing data
tableEnv.executeSql("INSERT OVERWRITE my_table SELECT * FROM updated_data");
// Write with dynamic partitioning
tableEnv.executeSql(
"INSERT INTO partitioned_table " +
"SELECT id, name, year, month FROM source_with_partitions"
);Specialized table source for temporal joins and lookup operations with Hive tables.
/**
* Lookup table source for temporal joins with Hive tables
* Extends HiveTableSource and implements LookupTableSource interface
*/
public class HiveLookupTableSource extends HiveTableSource implements LookupTableSource {
/** Get lookup runtime provider */
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context);
/** Create copy of the lookup source */
public DynamicTableSource copy();
}Usage Examples:
// Temporal join with Hive dimension table
Table orders = tableEnv.from("Orders");
Table result = tableEnv.sqlQuery(
"SELECT o.*, d.description " +
"FROM Orders AS o " +
"JOIN dimension_table FOR SYSTEM_TIME AS OF o.proc_time AS d " +
"ON o.product_id = d.id"
);Factory for creating Hive table sources and sinks from configuration.
/**
* Factory for creating Hive dynamic table sources and sinks
*/
public class HiveDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
/** Create dynamic table source */
public DynamicTableSource createDynamicTableSource(Context context);
/** Create dynamic table sink */
public DynamicTableSink createDynamicTableSink(Context context);
/** Get factory identifier */
public String factoryIdentifier();
/** Get required configuration options */
public Set<ConfigOption<?>> requiredOptions();
/** Get optional configuration options */
public Set<ConfigOption<?>> optionalOptions();
}Backward compatibility support for older Flink versions.
/**
* Legacy table factory for backward compatibility
* @deprecated Use HiveDynamicTableFactory instead
*/
@Deprecated
public class HiveTableFactory implements TableSourceFactory<Row>, TableSinkFactory<Row> {
/** Create table source */
public TableSource<Row> createTableSource(Context context);
/** Create table sink */
public TableSink<Row> createTableSink(Context context);
/** Get required properties */
public Map<String, String> requiredContext();
/** Get supported properties */
public List<String> supportedProperties();
}The connector automatically prunes partitions based on SQL WHERE clauses, significantly reducing data scanning:
// Only scans partitions matching the filter
SELECT * FROM sales WHERE year = 2023 AND month = 'January'Column projection reduces data transfer by only reading required columns:
// Only reads 'id' and 'name' columns
SELECT id, name FROM wide_table WHERE active = trueLimit operations are pushed to the source to reduce data processing:
// Stops reading after 100 records
SELECT * FROM large_table LIMIT 100Tables can be read in streaming mode for continuous processing:
// Configure for streaming reads
tableEnv.getConfig().getConfiguration().setString(
"table.exec.source.idle-timeout", "10s"
);