Lower-level DataStream API integration providing fine-grained control over Hive data processing with custom formats, transformations, and advanced source configuration options.
Unified data source for reading Hive tables with custom data types and advanced configuration.
/**
* Unified data source for reading Hive tables with custom data types
*/
@PublicEvolving
public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
/** Get split serializer for checkpoint operations */
public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();
/** Get enumerator checkpoint serializer */
public SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>> getEnumeratorCheckpointSerializer();
/** Create split enumerator for assigning splits to readers */
public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> createEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext);
/** Restore split enumerator from checkpoint */
public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> restoreEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext, PendingSplitsCheckpoint<HiveSourceSplit> checkpoint);
}Usage Examples:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
// Create streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Build and use HiveSource
HiveSource<RowData> source = new HiveSourceBuilder()
.setProjectedFields(new int[]{0, 1, 2}) // Select specific columns
.setLimit(10000L) // Limit number of records
.buildWithDefaultBulkFormat();
// Create DataStream from source
DataStream<RowData> stream = env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
"hive-source"
);
// Process the stream
stream.map(row -> {
// Custom processing logic
return processRow(row);
}).print();
env.execute("Hive Streaming Job");Builder pattern for constructing HiveSource instances with various configuration options.
/**
* Builder for constructing HiveSource instances with configuration options
*/
@PublicEvolving
public class HiveSourceBuilder {
/**
* Set projected fields for column pruning
* @param projectedFields - Array of column indices to read
* @return Builder instance for chaining
*/
public HiveSourceBuilder setProjectedFields(int[] projectedFields);
/**
* Set limit for number of records to read
* @param limit - Maximum number of records to read
* @return Builder instance for chaining
*/
public HiveSourceBuilder setLimit(Long limit);
/**
* Set specific partitions to read from
* @param partitions - List of partitions to include
* @return Builder instance for chaining
*/
public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
/**
* Build source with default bulk format
* @return Configured HiveSource instance
*/
public <T> HiveSource<T> buildWithDefaultBulkFormat();
}Usage Examples:
import org.apache.flink.connectors.hive.HiveTablePartition;
// Build source with column projection
HiveSource<RowData> projectedSource = new HiveSourceBuilder()
.setProjectedFields(new int[]{0, 2, 5}) // Read columns 0, 2, and 5
.buildWithDefaultBulkFormat();
// Build source with partition filtering
List<HiveTablePartition> partitions = Arrays.asList(
getPartition("year=2023", "month=01"),
getPartition("year=2023", "month=02")
);
HiveSource<RowData> partitionedSource = new HiveSourceBuilder()
.setPartitions(partitions)
.setLimit(5000L)
.buildWithDefaultBulkFormat();
// Build source with all optimizations
HiveSource<RowData> optimizedSource = new HiveSourceBuilder()
.setProjectedFields(new int[]{0, 1, 3})
.setPartitions(selectedPartitions)
.setLimit(1000L)
.buildWithDefaultBulkFormat();Represents a Hive table partition with metadata and storage information.
/**
* Represents a Hive table partition with metadata
*/
@PublicEvolving
public class HiveTablePartition {
/** Get storage descriptor for the partition */
public StorageDescriptor getStorageDescriptor();
/** Get partition specification as key-value pairs */
public LinkedHashMap<String, String> getPartitionSpec();
/** Get table properties */
public Properties getTableProperties();
/** Get partition location */
public String getLocation();
/** Check if partition is stored in a specific format */
public boolean isStoredAsSubDirectories();
}Represents a split for reading from Hive sources, containing partition and file information.
/**
* Represents a split for reading from Hive sources
*/
@PublicEvolving
public class HiveSourceSplit implements SourceSplit {
/** Get unique split identifier */
public String splitId();
/** Get associated Hive table partition */
public HiveTablePartition getHiveTablePartition();
/** Get file splits within this partition */
public List<FileSplit> getFileSplits();
/** Get reader schema for this split */
public TableSchema getReaderSchema();
}Configure continuous monitoring of Hive tables for new data:
// Enable streaming mode with partition monitoring
Configuration config = new Configuration();
config.setString("table.exec.source.idle-timeout", "10s");
config.setBoolean("table.exec.hive.infer-source-parallelism", true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.configure(config);Process Hive data with custom transformations:
DataStream<RowData> hiveStream = env.fromSource(hiveSource, watermarkStrategy, "hive");
// Custom processing with DataStream API
DataStream<CustomRecord> processed = hiveStream
.map(new HiveRowDataMapper())
.filter(record -> record.isValid())
.keyBy(CustomRecord::getKey)
.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
.aggregate(new CustomAggregator());Optimize for large batch processing workloads:
// Configure for batch processing
HiveSource<RowData> batchSource = new HiveSourceBuilder()
.setProjectedFields(requiredColumns)
.setLimit(null) // No limit for full table scan
.buildWithDefaultBulkFormat();
// Use with batch execution environment
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
batchEnv.getConfig().enableObjectReuse(); // Optimize for batchAutomatically discover and process new partitions:
// Configure partition discovery interval
config.setString("partition.discovery.interval-millis", "60000"); // 1 minute
// Source will automatically discover new partitions
HiveSource<RowData> discoverySource = new HiveSourceBuilder()
.buildWithDefaultBulkFormat();