or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mdconfiguration.mddatastream-source.mdhive-functions.mdindex.mdtable-api.md
tile.json

datastream-source.mddocs/

DataStream Source

Low-level streaming and batch source for reading Hive tables directly in DataStream API programs with full control over parallelism, partitioning, and data formats.

Capabilities

HiveSourceBuilder

Builder pattern class for creating configured HiveSource instances with specific partitions, limits, projections, and formats.

/**
 * Fluent API for configuring and building Hive sources
 * Provides comprehensive control over source behavior and data access patterns
 */
class HiveSourceBuilder {
    /**
     * Create a new HiveSourceBuilder instance from database and table names
     * @param jobConf - Hadoop JobConf with Hive configuration (metastore URI, etc.)
     * @param flinkConf - Flink configuration for connector settings
     * @param hiveVersion - Hive version string (e.g., "2.3.6"), nullable for auto-detection
     * @param dbName - Database name containing the target table
     * @param tableName - Name of the Hive table to read from
     * @param tableOptions - Additional table-specific options as key-value pairs
     */
    HiveSourceBuilder(@Nonnull JobConf jobConf, @Nonnull ReadableConfig flinkConf, 
                     @Nullable String hiveVersion, @Nonnull String dbName, 
                     @Nonnull String tableName, @Nonnull Map<String, String> tableOptions);
    
    /**
     * Create a new HiveSourceBuilder instance from CatalogTable
     * @param jobConf - Hadoop JobConf with Hive configuration (metastore URI, etc.)
     * @param flinkConf - Flink configuration for connector settings
     * @param tablePath - ObjectPath specifying database and table names
     * @param hiveVersion - Hive version string (e.g., "2.3.6"), nullable for auto-detection
     * @param catalogTable - CatalogTable instance with schema and configuration
     */
    HiveSourceBuilder(@Nonnull JobConf jobConf, @Nonnull ReadableConfig flinkConf,
                     @Nonnull ObjectPath tablePath, @Nullable String hiveVersion,
                     @Nonnull CatalogTable catalogTable);
    
    /**
     * Set specific partitions to read from (for partitioned tables)
     * If not set, reads from all partitions
     * @param partitions - List of HiveTablePartition instances to read
     * @return This builder instance for method chaining
     */
    HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
    
    /**
     * Set maximum number of rows to read from the source
     * @param limit - Maximum row count, or null for no limit
     * @return This builder instance for method chaining
     */
    HiveSourceBuilder setLimit(Long limit);
    
    /**
     * Set column projection to read only specific fields
     * @param projectedFields - Array of column indices to project
     * @return This builder instance for method chaining
     */
    HiveSourceBuilder setProjectedFields(int[] projectedFields);
    
    /**
     * Build HiveSource with default bulk format (typically RowData)
     * Automatically determines the appropriate format based on table schema
     * @return Configured HiveSource instance
     */
    HiveSource<RowData> buildWithDefaultBulkFormat();
    
    /**
     * Build HiveSource with custom bulk format for specific data types
     * @param bulkFormat - Custom BulkFormat implementation for parsing files
     * @return Configured HiveSource instance with custom type
     */
    <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
}

Usage Examples:

import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hadoop.conf.Configuration;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create Hadoop configuration
Configuration jobConf = new Configuration();
jobConf.set("hive.metastore.uris", "thrift://localhost:9083");
jobConf.set("fs.defaultFS", "hdfs://namenode:8020");

// Build basic Hive source
HiveSource<RowData> basicSource = new HiveSourceBuilder(
    jobConf,
    env.getConfiguration(),
    "2.3.6",
    "sales_db",
    "transactions",
    Collections.emptyMap()
).buildWithDefaultBulkFormat();

// Build source with specific partitions
List<HiveTablePartition> partitions = Arrays.asList(
    HiveTablePartition.ofPartition(jobConf, "2.3.6", "sales_db", "transactions",
        LinkedHashMap.of("year", "2023", "month", "12"))
);

HiveSource<RowData> partitionedSource = new HiveSourceBuilder(
    jobConf,
    env.getConfiguration(), 
    "2.3.6",
    "sales_db", 
    "transactions",
    Collections.emptyMap()
)
.setPartitions(partitions)
.setLimit(10000L)
.setProjectedFields(new int[]{0, 2, 5}) // Read only columns 0, 2, and 5
.buildWithDefaultBulkFormat();

// Add to streaming job
env.fromSource(partitionedSource, WatermarkStrategy.noWatermarks(), "hive-source")
   .map(row -> processRow(row))
   .print();

env.execute("Hive DataStream Job");

HiveTablePartition

Data class representing Hive table or partition metadata, used for configuring specific data access patterns.

/**
 * Encapsulates Hive partition metadata and properties
 * Represents either a complete table or a specific partition within a table
 */
class HiveTablePartition {
    /**
     * Create partition representation for a complete table (non-partitioned)
     * @param storageDescriptor - Hive storage descriptor with location and format info
     * @param tableProps - Table properties from Hive metastore
     */
    HiveTablePartition(StorageDescriptor storageDescriptor, Properties tableProps);
    
    /**
     * Create partition representation for a specific partition
     * @param storageDescriptor - Hive storage descriptor with location and format info
     * @param partitionSpec - Partition key-value specification
     * @param tableProps - Table properties from Hive metastore
     */
    HiveTablePartition(StorageDescriptor storageDescriptor, 
                      Map<String, String> partitionSpec, Properties tableProps);
    
    /**
     * Get storage descriptor containing location, input/output formats, and SerDe info
     * @return Hive StorageDescriptor instance
     */
    StorageDescriptor getStorageDescriptor();
    
    /**
     * Get partition specification as key-value pairs
     * Returns empty map for non-partitioned tables
     * @return Map of partition key names to values
     */
    Map<String, String> getPartitionSpec();
    
    /**
     * Get table properties from Hive metastore
     * @return Properties instance with table configuration
     */
    Properties getTableProps();
    
    /**
     * Create HiveTablePartition for a complete table
     * @param hiveConf - Hive configuration instance
     * @param hiveVersion - Hive version string, nullable for auto-detection
     * @param dbName - Database name
     * @param tableName - Table name
     * @return HiveTablePartition representing the entire table
     */
    static HiveTablePartition ofTable(HiveConf hiveConf, @Nullable String hiveVersion, 
                                     String dbName, String tableName);
    
    /**
     * Create HiveTablePartition for a specific partition
     * @param hiveConf - Hive configuration instance
     * @param hiveVersion - Hive version string, nullable for auto-detection
     * @param dbName - Database name
     * @param tableName - Table name
     * @param partitionSpec - Partition specification as ordered key-value pairs
     * @return HiveTablePartition representing the specific partition
     */
    static HiveTablePartition ofPartition(HiveConf hiveConf, @Nullable String hiveVersion,
                                         String dbName, String tableName, 
                                         LinkedHashMap<String, String> partitionSpec);
}

Usage Examples:

import org.apache.flink.connectors.hive.HiveTablePartition;
import org.apache.hadoop.hive.conf.HiveConf;

// Create HiveConf from configuration
HiveConf hiveConf = new HiveConf();
hiveConf.set("hive.metastore.uris", "thrift://localhost:9083");

// Create partition for entire table
HiveTablePartition tablePartition = HiveTablePartition.ofTable(
    hiveConf, "2.3.6", "sales_db", "customer_data"
);

// Create partition for specific partition
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();
partSpec.put("year", "2023");
partSpec.put("month", "12");
partSpec.put("day", "01");

HiveTablePartition specificPartition = HiveTablePartition.ofPartition(
    hiveConf, "2.3.6", "sales_db", "daily_sales", partSpec
);

// Use in source builder
List<HiveTablePartition> partitions = Arrays.asList(specificPartition);
HiveSource<RowData> source = new HiveSourceBuilder(jobConf, flinkConf, "2.3.6", 
    "sales_db", "daily_sales", Collections.emptyMap())
    .setPartitions(partitions)
    .buildWithDefaultBulkFormat();

HiveSource

Main source class for reading Hive data in DataStream API, supporting both streaming and batch execution modes.

/**
 * Primary source for reading data from Hive tables in DataStream API
 * Extends AbstractFileSource to provide file-based data access with Hive metadata integration
 */
class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
    // Source is typically created via HiveSourceBuilder rather than direct instantiation
    // Supports both streaming and batch execution modes
    // Provides automatic parallelism inference based on file splits
    // Handles various file formats (Parquet, ORC, text, etc.) transparently
}

Usage Examples:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;

// Create source via builder
HiveSource<RowData> source = new HiveSourceBuilder(jobConf, flinkConf, "2.3.6",
    "analytics", "web_events", Collections.emptyMap())
    .buildWithDefaultBulkFormat();

// Use in streaming mode with watermarks
DataStream<RowData> stream = env.fromSource(
    source,
    WatermarkStrategy.<RowData>forBoundedOutOfOrderness(Duration.ofMinutes(5))
        .withTimestampAssigner((event, timestamp) -> 
            event.getTimestamp(2, 3).getMillisecond()), // Extract timestamp from event
    "hive-events-source"
);

// Process the stream
stream
    .filter(row -> row.getString(1).toString().contains("purchase"))
    .map(row -> transformToEvent(row))
    .keyBy(event -> event.getUserId())
    .window(TumblingEventTimeWindows.of(Duration.ofHours(1)))
    .aggregate(new EventAggregator())
    .print();

HiveSourceSplit

Represents a split of Hive data for parallel processing, extending FileSourceSplit for file-based data access.

/**
 * Represents a split of Hive data for parallel processing
 * Contains information about specific files or file ranges to be processed by a subtask
 */
class HiveSourceSplit extends FileSourceSplit {
    /**
     * Create HiveSourceSplit from Hadoop FileSplit
     * @param fileSplit - Hadoop MapReduce FileSplit
     * @param hiveTablePartition - Associated HiveTablePartition metadata
     * @param readerPosition - Optional checkpointed position for restarts
     */
    HiveSourceSplit(FileSplit fileSplit, HiveTablePartition hiveTablePartition,
                   @Nullable CheckpointedPosition readerPosition) throws IOException;
    
    /**
     * Create HiveSourceSplit with full parameters
     * @param id - Unique split identifier
     * @param filePath - Path to the file for this split
     * @param offset - Starting byte offset in the file
     * @param length - Number of bytes to read
     * @param fileModificationTime - File modification timestamp
     * @param fileSize - Total file size in bytes
     * @param hostnames - Preferred host locations for this split
     * @param readerPosition - Optional checkpointed position for restarts
     * @param hiveTablePartition - Associated HiveTablePartition metadata
     */
    HiveSourceSplit(String id, Path filePath, long offset, long length,
                   long fileModificationTime, long fileSize, String[] hostnames,
                   @Nullable CheckpointedPosition readerPosition,
                   HiveTablePartition hiveTablePartition);
    
    /**
     * Get the HiveTablePartition associated with this split
     * @return HiveTablePartition containing partition metadata
     */
    HiveTablePartition getHiveTablePartition();
    
    /**
     * Convert this split to a Hadoop MapReduce FileSplit
     * @return FileSplit compatible with MapReduce APIs
     */
    FileSplit toMapRedSplit();
    
    /**
     * Create new split with updated checkpointed position
     * @param position - New checkpointed position, nullable
     * @return New HiveSourceSplit instance with updated position
     */
    @Override
    FileSourceSplit updateWithCheckpointedPosition(@Nullable CheckpointedPosition position);
}

Streaming Configuration

For streaming sources, additional configuration options control partition monitoring and consumption:

// Enable streaming mode for continuous partition monitoring
jobConf.setBoolean("streaming-source.enable", true);

// Set monitoring interval for new partitions
jobConf.set("streaming-source.monitor-interval", "1min");

// Configure partition inclusion strategy
jobConf.set("streaming-source.partition.include", "latest");

// Set partition consumption order
jobConf.set("streaming-source.partition-order", "partition-time");

// Configure starting offset for consumption
jobConf.set("streaming-source.consume-start-offset", "2023-12-01");

File Format Support

The HiveSource automatically handles various file formats based on table metadata:

  • Parquet: Columnar format with predicate pushdown support
  • ORC: Optimized row columnar format with vectorized reading
  • Text: Delimited text files with configurable separators
  • Avro: Schema evolution support with embedded schemas
  • JSON: Semi-structured data with flexible schema handling
  • Custom: Support for custom InputFormats via Hadoop API