CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6-2-11

Apache Flink SQL connector for Apache Hive 2.3.6 with Scala 2.11 binary compatibility

Pending
Overview
Eval results
Files

source-api.mddocs/

Source API

New Source API implementation for Hive tables providing enhanced control over split enumeration and reading with support for continuous partition monitoring, parallel reading, and efficient split management.

Capabilities

HiveSource

Main Source API implementation for reading Hive tables that extends AbstractFileSource and uses the new Flink Source interface.

/**
 * A unified data source that reads a hive table. HiveSource works on HiveSourceSplit and
 * uses BulkFormat to read the data. A built-in BulkFormat is provided to return records in
 * type of RowData. It's also possible to implement a custom BulkFormat to return data in
 * different types. Use HiveSourceBuilder to build HiveSource instances.
 *
 * @param <T> the type of record returned by this source
 */
@PublicEvolving
public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
    /**
     * Package-private constructor used by HiveSourceBuilder
     * @param inputPaths - Array of input paths (typically contains single dummy path)
     * @param fileEnumerator - Provider for file enumeration
     * @param splitAssigner - Provider for split assignment
     * @param readerFormat - BulkFormat for reading records
     * @param continuousEnumerationSettings - Settings for continuous monitoring (null for batch)
     * @param jobConf - Hadoop JobConf with Hive configurations
     * @param tablePath - ObjectPath identifying the Hive table
     * @param partitionKeys - List of partition key names
     * @param fetcher - Continuous partition fetcher for streaming mode (can be null)
     * @param fetcherContext - Context for continuous partition fetching (can be null)
     */
    HiveSource(
            Path[] inputPaths,
            FileEnumerator.Provider fileEnumerator,
            FileSplitAssigner.Provider splitAssigner,
            BulkFormat<T, HiveSourceSplit> readerFormat,
            @Nullable ContinuousEnumerationSettings continuousEnumerationSettings,
            JobConf jobConf,
            ObjectPath tablePath,
            List<String> partitionKeys,
            @Nullable ContinuousPartitionFetcher<Partition, ?> fetcher,
            @Nullable HiveTableSource.HiveContinuousPartitionFetcherContext<?> fetcherContext);

    /**
     * Get serializer for HiveSourceSplit objects
     * @return Serializer instance for splits
     */
    @Override
    public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();

    /**
     * Get serializer for enumerator checkpoints
     * @return Serializer for checkpoint state
     */
    @Override
    public SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>>
            getEnumeratorCheckpointSerializer();

    /**
     * Create split enumerator for discovering and assigning splits
     * @param enumContext - Context for enumerator creation
     * @return SplitEnumerator instance for managing splits
     */
    @Override
    public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>>
            createEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext);

    /**
     * Create split enumerator from checkpoint state
     * @param enumContext - Context for enumerator creation
     * @param checkpoint - Checkpoint state to restore from
     * @return SplitEnumerator instance restored from checkpoint
     */
    @Override
    public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>>
            restoreEnumerator(
                    SplitEnumeratorContext<HiveSourceSplit> enumContext,
                    PendingSplitsCheckpoint<HiveSourceSplit> checkpoint);
}

HiveSourceBuilder

Builder pattern for creating configured HiveSource instances with validation and setup.

/**
 * Builder to build HiveSource instances.
 */
@PublicEvolving
public class HiveSourceBuilder {
    /**
     * Creates a builder to read a hive table using metastore information
     * @param jobConf - Holds hive and hadoop configurations
     * @param flinkConf - Holds flink configurations
     * @param hiveVersion - The version of hive in use, if null will be auto-detected
     * @param dbName - The name of the database the table belongs to
     * @param tableName - The name of the table
     * @param tableOptions - Additional options needed to read the table, which take precedence over table properties stored in metastore
     */
    public HiveSourceBuilder(
            @Nonnull JobConf jobConf,
            @Nonnull ReadableConfig flinkConf,
            @Nullable String hiveVersion,
            @Nonnull String dbName,
            @Nonnull String tableName,
            @Nonnull Map<String, String> tableOptions);

    /**
     * Creates a builder to read a hive table using catalog table information
     * @param jobConf - Holds hive and hadoop configurations
     * @param flinkConf - Holds flink configurations
     * @param tablePath - Path of the table to be read
     * @param hiveVersion - The version of hive in use, if null will be auto-detected
     * @param catalogTable - The table to be read
     */
    public HiveSourceBuilder(
            @Nonnull JobConf jobConf,
            @Nonnull ReadableConfig flinkConf,
            @Nonnull ObjectPath tablePath,
            @Nullable String hiveVersion,
            @Nonnull CatalogTable catalogTable);

    /**
     * Sets the partitions to read in batch mode. By default, batch source reads all partitions in a hive table.
     * @param partitions - List of specific partitions to read
     * @return Builder instance for chaining
     */
    public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);

    /**
     * Sets the maximum number of records this source should return
     * @param limit - Maximum number of records to read
     * @return Builder instance for chaining
     */
    public HiveSourceBuilder setLimit(Long limit);

    /**
     * Sets the indices of projected fields
     * @param projectedFields - Indices of the fields to project, starting from 0
     * @return Builder instance for chaining
     */
    public HiveSourceBuilder setProjectedFields(int[] projectedFields);

    /**
     * Builds HiveSource with default built-in BulkFormat that returns records in type of RowData
     * @return HiveSource configured for RowData output
     */
    public HiveSource<RowData> buildWithDefaultBulkFormat();

    /**
     * Builds HiveSource with custom BulkFormat
     * @param bulkFormat - Custom BulkFormat for reading records
     * @return HiveSource configured with the provided BulkFormat
     */
    public <T> HiveSource<T> buildWithBulkFormat(BulkFormat<T, HiveSourceSplit> bulkFormat);
}

Split Management

Classes for managing source splits and their lifecycle.

/**
 * A sub-class of FileSourceSplit that contains extra information needed to read a hive table.
 */
@PublicEvolving
public class HiveSourceSplit extends FileSourceSplit {
    /**
     * Create HiveSourceSplit from Hadoop FileSplit
     * @param fileSplit - Hadoop FileSplit containing file and offset information
     * @param hiveTablePartition - Hive table partition metadata
     * @param readerPosition - Current reader position for checkpointing (can be null)
     * @throws IOException if split creation fails
     */
    public HiveSourceSplit(
            FileSplit fileSplit,
            HiveTablePartition hiveTablePartition,
            @Nullable CheckpointedPosition readerPosition) throws IOException;

    /**
     * Create HiveSourceSplit with explicit parameters
     * @param id - Unique identifier for this split
     * @param filePath - Path to the file this split reads
     * @param offset - Start position in the file
     * @param length - Length of data to read
     * @param hostnames - Preferred hosts for reading this split
     * @param readerPosition - Current reader position for checkpointing (can be null)
     * @param hiveTablePartition - Hive table partition metadata
     */
    public HiveSourceSplit(
            String id,
            Path filePath,
            long offset,
            long length,
            String[] hostnames,
            @Nullable CheckpointedPosition readerPosition,
            HiveTablePartition hiveTablePartition);

    /**
     * Get Hive table partition metadata for this split
     * @return HiveTablePartition containing partition information
     */
    public HiveTablePartition getHiveTablePartition();

    /**
     * Convert this split to MapReduce FileSplit format
     * @return FileSplit compatible with Hadoop MapReduce API
     */
    public FileSplit toMapRedSplit();

    /**
     * Update split with new checkpointed position
     * @param position - New checkpointed position (can be null)
     * @return New HiveSourceSplit with updated position
     */
    @Override
    public FileSourceSplit updateWithCheckpointedPosition(@Nullable CheckpointedPosition position);
}

/**
 * Serializer for HiveSourceSplit objects
 */
public class HiveSourceSplitSerializer implements SimpleVersionedSerializer<HiveSourceSplit> {
    public static final HiveSourceSplitSerializer INSTANCE = new HiveSourceSplitSerializer();

    /**
     * Get serializer version
     * @return Version number for compatibility
     */
    @Override
    public int getVersion();

    /**
     * Serialize split to bytes
     * @param split - Split to serialize
     * @return Serialized bytes
     * @throws IOException if serialization fails
     */
    @Override
    public byte[] serialize(HiveSourceSplit split) throws IOException;

    /**
     * Deserialize split from bytes
     * @param version - Serializer version
     * @param serialized - Serialized bytes
     * @return Deserialized split
     * @throws IOException if deserialization fails
     */
    @Override
    public HiveSourceSplit deserialize(int version, byte[] serialized) throws IOException;
}

Usage Examples:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.mapred.JobConf;

// Create streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure Hive source with Source API for streaming
JobConf jobConf = new JobConf();
// Configure jobConf with Hive settings...

Configuration flinkConf = new Configuration();
// Set streaming source properties
flinkConf.setString("streaming-source.enable", "true");
flinkConf.setString("streaming-source.monitor-interval", "1 min");

Map<String, String> tableOptions = new HashMap<>();
tableOptions.put("streaming-source.enable", "true");
tableOptions.put("streaming-source.monitor-interval", "1 min");

HiveSource<RowData> hiveSource = new HiveSourceBuilder(
        jobConf,
        flinkConf,
        null, // auto-detect Hive version
        "default",
        "streaming_events",
        tableOptions)
    .setProjectedFields(new int[]{0, 1, 3}) // Project specific columns
    .buildWithDefaultBulkFormat();

// Create data stream from Hive source
DataStreamSource<RowData> stream = env.fromSource(
    hiveSource,
    WatermarkStrategy.noWatermarks(),  
    "hive-source"
);

// Process the stream
stream
    .map(new ProcessRowDataFunction())
    .print();

env.execute("Hive Source API Example");
// Batch reading with Source API
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Configure specific partitions to read
List<HiveTablePartition> partitions = Arrays.asList(
    // Create partitions using HivePartitionUtils or other methods
);

HiveSource<RowData> batchSource = new HiveSourceBuilder(
        jobConf,
        flinkConf,
        null, // auto-detect Hive version  
        "sales",
        "orders",
        new HashMap<>())
    .setPartitions(partitions)
    .setLimit(10000L) // Limit to 10k records
    .buildWithDefaultBulkFormat();

DataStreamSource<RowData> batchStream = env.fromSource(
    batchSource,
    WatermarkStrategy.noWatermarks(),
    "hive-batch-source"
);

batchStream.print();
env.execute("Hive Batch Source Example");

Types

public abstract class AbstractFileSource<T, SplitT extends FileSourceSplit>
        implements Source<T, SplitT, PendingSplitsCheckpoint<SplitT>>, ResultTypeQueryable<T> {
    /**
     * Get boundedness of this source
     * @return Boundedness.BOUNDED for batch mode, CONTINUOUS_UNBOUNDED for streaming
     */
    @Override
    public Boundedness getBoundedness();
    
    /**
     * Create source reader
     * @param readerContext - Reader context provided by Flink runtime
     * @return SourceReader instance for reading splits
     */
    @Override
    public SourceReader<T, SplitT> createReader(SourceReaderContext readerContext);
    
    /**
     * Create split enumerator
     * @param enumContext - Enumerator context provided by Flink runtime
     * @return SplitEnumerator instance for managing splits
     * @throws Exception if creation fails
     */
    @Override
    public abstract SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> createEnumerator(
            SplitEnumeratorContext<SplitT> enumContext);

    /**
     * Restore split enumerator from checkpoint
     * @param enumContext - Enumerator context provided by Flink runtime
     * @param checkpoint - Checkpoint state to restore from
     * @return SplitEnumerator instance restored from checkpoint
     */
    @Override
    public abstract SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>> restoreEnumerator(
            SplitEnumeratorContext<SplitT> enumContext,
            PendingSplitsCheckpoint<SplitT> checkpoint);

    /**
     * Get split serializer
     * @return Serializer for split objects
     */
    @Override
    public abstract SimpleVersionedSerializer<SplitT> getSplitSerializer();

    /**
     * Get checkpoint serializer
     * @return Serializer for checkpoint objects
     */
    @Override
    public SimpleVersionedSerializer<PendingSplitsCheckpoint<SplitT>>
            getEnumeratorCheckpointSerializer();

    /**
     * Get type information for produced records
     * @return TypeInformation for the output type
     */
    @Override
    public TypeInformation<T> getProducedType();
}

public class FileSourceSplit implements SourceSplit {
    /**
     * Get split identifier
     * @return Unique split ID string
     */
    @Override
    public String splitId();
    
    /**
     * Get file path for this split
     * @return Path to the file
     */
    public Path path();
    
    /**
     * Get start position in file
     * @return Start byte position
     */
    public long offset();
    
    /**
     * Get length of data to read
     * @return Length in bytes
     */
    public long length();
    
    /**
     * Get preferred hosts for locality
     * @return Array of host names
     */
    public String[] hostnames();
    
    /**
     * Get current reader position for checkpointing
     * @return Optional containing checkpointed position
     */
    public Optional<CheckpointedPosition> getReaderPosition();
}

public enum Boundedness {
    BOUNDED,
    CONTINUOUS_UNBOUNDED
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6-2-11

docs

catalog-operations.md

configuration.md

hive-functions.md

index.md

source-api.md

table-sinks.md

table-sources.md

tile.json