or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

catalog.mdconfiguration.mddata-source.mdfunctions.mdindex.mdtable-api.md
tile.json

data-source.mddocs/

Unified Data Source

Lower-level DataStream API integration providing fine-grained control over Hive data processing with custom formats, transformations, and advanced source configuration options.

Capabilities

HiveSource

Unified data source for reading Hive tables with custom data types and advanced configuration.

/**
 * Unified data source for reading Hive tables with custom data types
 */
@PublicEvolving
public class HiveSource<T> extends AbstractFileSource<T, HiveSourceSplit> {
    
    /** Get split serializer for checkpoint operations */
    public SimpleVersionedSerializer<HiveSourceSplit> getSplitSerializer();
    
    /** Get enumerator checkpoint serializer */
    public SimpleVersionedSerializer<PendingSplitsCheckpoint<HiveSourceSplit>> getEnumeratorCheckpointSerializer();
    
    /** Create split enumerator for assigning splits to readers */
    public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> createEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext);
    
    /** Restore split enumerator from checkpoint */
    public SplitEnumerator<HiveSourceSplit, PendingSplitsCheckpoint<HiveSourceSplit>> restoreEnumerator(SplitEnumeratorContext<HiveSourceSplit> enumContext, PendingSplitsCheckpoint<HiveSourceSplit> checkpoint);
}

Usage Examples:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connectors.hive.HiveSource;
import org.apache.flink.connectors.hive.HiveSourceBuilder;

// Create streaming environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Build and use HiveSource
HiveSource<RowData> source = new HiveSourceBuilder()
    .setProjectedFields(new int[]{0, 1, 2})  // Select specific columns
    .setLimit(10000L)                        // Limit number of records
    .buildWithDefaultBulkFormat();

// Create DataStream from source
DataStream<RowData> stream = env.fromSource(
    source, 
    WatermarkStrategy.noWatermarks(), 
    "hive-source"
);

// Process the stream
stream.map(row -> {
    // Custom processing logic
    return processRow(row);
}).print();

env.execute("Hive Streaming Job");

HiveSourceBuilder

Builder pattern for constructing HiveSource instances with various configuration options.

/**
 * Builder for constructing HiveSource instances with configuration options
 */
@PublicEvolving
public class HiveSourceBuilder {
    
    /**
     * Set projected fields for column pruning
     * @param projectedFields - Array of column indices to read
     * @return Builder instance for chaining
     */
    public HiveSourceBuilder setProjectedFields(int[] projectedFields);
    
    /**
     * Set limit for number of records to read
     * @param limit - Maximum number of records to read
     * @return Builder instance for chaining
     */
    public HiveSourceBuilder setLimit(Long limit);
    
    /**
     * Set specific partitions to read from
     * @param partitions - List of partitions to include
     * @return Builder instance for chaining
     */
    public HiveSourceBuilder setPartitions(List<HiveTablePartition> partitions);
    
    /**
     * Build source with default bulk format
     * @return Configured HiveSource instance
     */
    public <T> HiveSource<T> buildWithDefaultBulkFormat();
}

Usage Examples:

import org.apache.flink.connectors.hive.HiveTablePartition;

// Build source with column projection
HiveSource<RowData> projectedSource = new HiveSourceBuilder()
    .setProjectedFields(new int[]{0, 2, 5})  // Read columns 0, 2, and 5
    .buildWithDefaultBulkFormat();

// Build source with partition filtering
List<HiveTablePartition> partitions = Arrays.asList(
    getPartition("year=2023", "month=01"),
    getPartition("year=2023", "month=02")
);

HiveSource<RowData> partitionedSource = new HiveSourceBuilder()
    .setPartitions(partitions)
    .setLimit(5000L)
    .buildWithDefaultBulkFormat();

// Build source with all optimizations
HiveSource<RowData> optimizedSource = new HiveSourceBuilder()
    .setProjectedFields(new int[]{0, 1, 3})
    .setPartitions(selectedPartitions)
    .setLimit(1000L)
    .buildWithDefaultBulkFormat();

HiveTablePartition

Represents a Hive table partition with metadata and storage information.

/**
 * Represents a Hive table partition with metadata
 */
@PublicEvolving
public class HiveTablePartition {
    
    /** Get storage descriptor for the partition */
    public StorageDescriptor getStorageDescriptor();
    
    /** Get partition specification as key-value pairs */
    public LinkedHashMap<String, String> getPartitionSpec();
    
    /** Get table properties */
    public Properties getTableProperties();
    
    /** Get partition location */
    public String getLocation();
    
    /** Check if partition is stored in a specific format */
    public boolean isStoredAsSubDirectories();
}

HiveSourceSplit

Represents a split for reading from Hive sources, containing partition and file information.

/**
 * Represents a split for reading from Hive sources
 */
@PublicEvolving
public class HiveSourceSplit implements SourceSplit {
    
    /** Get unique split identifier */
    public String splitId();
    
    /** Get associated Hive table partition */
    public HiveTablePartition getHiveTablePartition();
    
    /** Get file splits within this partition */
    public List<FileSplit> getFileSplits();
    
    /** Get reader schema for this split */
    public TableSchema getReaderSchema();
}

Advanced Usage Patterns

Streaming Mode Configuration

Configure continuous monitoring of Hive tables for new data:

// Enable streaming mode with partition monitoring
Configuration config = new Configuration();
config.setString("table.exec.source.idle-timeout", "10s");
config.setBoolean("table.exec.hive.infer-source-parallelism", true);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.configure(config);

Custom Data Processing

Process Hive data with custom transformations:

DataStream<RowData> hiveStream = env.fromSource(hiveSource, watermarkStrategy, "hive");

// Custom processing with DataStream API
DataStream<CustomRecord> processed = hiveStream
    .map(new HiveRowDataMapper())
    .filter(record -> record.isValid())
    .keyBy(CustomRecord::getKey)
    .window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
    .aggregate(new CustomAggregator());

Batch Processing Optimization

Optimize for large batch processing workloads:

// Configure for batch processing
HiveSource<RowData> batchSource = new HiveSourceBuilder()
    .setProjectedFields(requiredColumns)
    .setLimit(null)  // No limit for full table scan
    .buildWithDefaultBulkFormat();

// Use with batch execution environment
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
batchEnv.getConfig().enableObjectReuse();  // Optimize for batch

Partition Discovery

Automatically discover and process new partitions:

// Configure partition discovery interval
config.setString("partition.discovery.interval-millis", "60000");  // 1 minute

// Source will automatically discover new partitions
HiveSource<RowData> discoverySource = new HiveSourceBuilder()
    .buildWithDefaultBulkFormat();