CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6-2-11

Apache Flink SQL connector for Apache Hive 2.3.6 with Scala 2.11 binary compatibility

Pending
Overview
Eval results
Files

table-sinks.mddocs/

Table Sinks

Writing data to Hive tables with support for partitioning, multiple file formats, streaming ingestion with compaction, and seamless integration with Hive metastore for metadata management.

Capabilities

HiveTableSink

Primary table sink for writing data to Hive tables with comprehensive partitioning and format support.

/**
 * Table sink for writing data to Hive tables
 * Supports partitioning, overwrite modes, and various file formats
 */
public class HiveTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
    /**
     * Creates HiveTableSink for writing to Hive tables
     * @param conf - Flink configuration
     * @param jobConf - Hadoop job configuration  
     * @param identifier - Table identifier
     * @param catalogTable - Catalog table metadata
     * @param configuredParallelism - Configured sink parallelism (can be null)
     */
    public HiveTableSink(ReadableConfig conf, JobConf jobConf, ObjectIdentifier identifier, CatalogTable catalogTable, Integer configuredParallelism);
    
    /**
     * Get the sink runtime provider for writing data
     * @param context - Context for sink operation
     * @return SinkRuntimeProvider for data stream sink creation
     */
    public SinkRuntimeProvider getSinkRuntimeProvider(Context context);
    
    /**
     * Get the changelog mode supported by this sink
     * @return ChangelogMode indicating supported change types
     */
    public ChangelogMode getChangelogMode();
    
    /**
     * Copy this sink with different configuration
     * @return New HiveTableSink instance
     */
    public DynamicTableSink copy();
    
    /**
     * Get string summary of this table sink
     * @return Human-readable description
     */
    public String asSummaryString();
}

Partitioning Support

Interface for configuring partitioned writes to Hive tables.

/**
 * Apply static partition specification
 * Sets fixed partition values for all written records
 * @param partition - Map of partition key to value
 * @return New HiveTableSink with static partitioning applied
 */
public DynamicTableSink applyStaticPartition(Map<String, String> partition);

/**
 * Check if sink requires partition grouping
 * @param supportsGrouping - Whether grouping is supported by the runtime
 * @return true if partition grouping is required
 */
public boolean requiresPartitionGrouping(boolean supportsGrouping);

Overwrite Support

Interface for configuring overwrite behavior when writing to existing data.

/**
 * Apply overwrite mode configuration
 * Controls whether existing data should be overwritten
 * @param overwrite - true to enable overwrite mode
 * @return New DynamicTableSink with overwrite configuration
 */
public DynamicTableSink applyOverwrite(boolean overwrite);

Writer Factory Classes

Factory classes for creating writers for different file formats and configurations.

/**
 * Factory for creating Hive bulk writers
 * Handles format-specific writer creation and configuration
 */
public class HiveBulkWriterFactory implements BulkWriter.Factory<RowData> {
    /**
     * Create HiveBulkWriterFactory for specific format
     * @param jobConf - Hadoop job configuration
     * @param tableSchema - Schema of the table
     * @param hiveShim - Hive version compatibility shim
     * @param isCompressed - Whether output should be compressed
     */
    public HiveBulkWriterFactory(JobConf jobConf, TableSchema tableSchema, HiveShim hiveShim, boolean isCompressed);
    
    /**
     * Create bulk writer for given file
     * @param out - Output stream to write to
     * @return BulkWriter instance for writing records
     * @throws IOException if writer creation fails
     */
    public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException;
}

/**
 * Factory for creating Hive output formats
 * Provides MapReduce-compatible output format creation
 */
public class HiveOutputFormatFactory {
    /**
     * Create output format for Hive table
     * @param jobConf - Hadoop job configuration
     * @param catalogTable - Catalog table metadata
     * @param storageDescriptor - Hive table storage descriptor
     * @param partitionSpec - Partition specification (can be empty)
     * @param isOverwrite - Whether to overwrite existing data
     * @return OutputFormat instance for writing
     */
    public static OutputFormat<NullWritable, RowData> createOutputFormat(
        JobConf jobConf,
        CatalogTable catalogTable, 
        StorageDescriptor storageDescriptor,
        Map<String, String> partitionSpec,
        boolean isOverwrite
    );
}

/**
 * Factory for creating generic Hive writers
 * Abstracts writer creation across different formats
 */
public class HiveWriterFactory implements WriterFactory<RowData> {
    /**
     * Create HiveWriterFactory with configuration
     * @param jobConf - Hadoop job configuration
     * @param catalogTable - Catalog table metadata
     * @param isOverwrite - Whether to overwrite existing data
     * @param staticPartSpec - Static partition specification
     */
    public HiveWriterFactory(JobConf jobConf, CatalogTable catalogTable, boolean isOverwrite, LinkedHashMap<String, String> staticPartSpec);
    
    /**
     * Create writer for specific partition
     * @param context - Writer context with partition info
     * @return Writer instance for the partition
     * @throws IOException if writer creation fails
     */
    public Writer<RowData> createWriter(WriterInitContext context) throws IOException;
}

Streaming Sink Configuration

Configuration classes for streaming sinks with file rolling and compaction.

/**
 * Configure streaming file sink with rolling policies
 * @param basePath - Base path for writing files
 * @param writerFactory - Factory for creating bulk writers
 * @param bucketAssigner - Function to assign records to buckets/partitions
 * @param rollingPolicy - Policy for when to roll files
 * @param outputFileConfig - Configuration for output file naming
 * @return Configured StreamingFileSink
 */
public static <T> StreamingFileSink<T> createStreamingSink(
    Path basePath,
    BulkWriter.Factory<T> writerFactory,
    BucketAssigner<T, String> bucketAssigner,
    RollingPolicy<T, String> rollingPolicy,
    OutputFileConfig outputFileConfig
);

Partition and Bucket Assignment

Classes for managing data distribution across partitions and files.

/**
 * Assigns records to partition buckets based on partition keys
 */
public class HiveRowDataPartitionComputer implements PartitionComputer<RowData> {
    /**
     * Create partition computer for Hive table
     * @param hiveShim - Hive version compatibility shim
     * @param defaultPartName - Default name for null partition values
     * @param fieldNames - Names of all table fields
     * @param fieldTypes - Types of all table fields  
     * @param partitionColumns - Names of partition columns
     */
    public HiveRowDataPartitionComputer(HiveShim hiveShim, String defaultPartName, String[] fieldNames, DataType[] fieldTypes, String[] partitionColumns);
    
    /**
     * Generate partition path for given record
     * @param in - Input record
     * @return Partition path string
     */
    public String generatePartValues(RowData in);
}

/**
 * Legacy partition computer for Row objects
 */
public class HiveRowPartitionComputer implements PartitionComputer<Row> {
    public HiveRowPartitionComputer(HiveShim hiveShim, String defaultPartName, String[] fieldNames, TypeInformation<?>[] fieldTypes, String[] partitionColumns);
    public String generatePartValues(Row in);
}

File and Checkpoint Management

Classes for managing file lifecycle and streaming checkpoints.

/**
 * Configuration for output file naming
 */
public class OutputFileConfig {
    /**
     * Create output file configuration
     * @param partPrefix - Prefix for part files
     * @param partSuffix - Suffix for part files  
     */
    public OutputFileConfig(String partPrefix, String partSuffix);
    
    public String getPartPrefix();
    public String getPartSuffix();
}

/**
 * Rolling policy based on checkpoints
 * Files are rolled when checkpoints occur
 */
public class CheckpointRollingPolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
    /**
     * Check if file should be rolled
     * @param partFileState - Current state of the part file
     * @param element - Current element being processed
     * @param processingTime - Current processing time
     * @return true if file should be rolled
     */
    public boolean shouldRollOnEvent(PartFileInfo partFileState, IN element, long processingTime);
    
    /**
     * Check if file should be rolled on processing time
     * @param partFileState - Current state of the part file
     * @param processingTime - Current processing time
     * @return true if file should be rolled
     */
    public boolean shouldRollOnProcessingTime(PartFileInfo partFileState, long processingTime);
}

Usage Examples:

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

// Set up table environment with Hive catalog
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");
tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.useCatalog("hive");

// Insert data into partitioned Hive table
tableEnv.executeSql(
    "INSERT INTO hive_catalog.sales.orders " +
    "PARTITION (year='2023', month='12') " +
    "SELECT order_id, customer_id, order_total, order_date " +
    "FROM hive_catalog.staging.raw_orders " +
    "WHERE YEAR(order_date) = 2023 AND MONTH(order_date) = 12"
);

// Overwrite existing partition
tableEnv.executeSql(
    "INSERT OVERWRITE hive_catalog.sales.daily_summary " +
    "PARTITION (date_key='2023-12-01') " +
    "SELECT customer_id, SUM(order_total) as total_sales " +
    "FROM hive_catalog.sales.orders " +
    "WHERE DATE(order_date) = '2023-12-01' " +
    "GROUP BY customer_id"
);
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

// Set up streaming environment for continuous writing
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000); // Enable checkpointing for file rolling
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Register Hive catalog
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "/opt/hive/conf", null, "2.3.6");
tableEnv.registerCatalog("hive", hiveCatalog);
tableEnv.useCatalog("hive");

// Create source table from Kafka
tableEnv.executeSql(
    "CREATE TABLE kafka_orders (" +
    "  order_id BIGINT," +
    "  customer_id BIGINT," +
    "  order_total DECIMAL(10,2)," +
    "  order_time TIMESTAMP(3)," +
    "  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND" +
    ") WITH (" +
    "  'connector' = 'kafka'," +
    "  'topic' = 'orders'," +
    "  'properties.bootstrap.servers' = 'localhost:9092'," +
    "  'format' = 'json'" +
    ")"
);

// Stream data to partitioned Hive table
tableEnv.executeSql(
    "INSERT INTO hive_catalog.sales.streaming_orders " +
    "SELECT " +
    "  order_id," +
    "  customer_id," +
    "  order_total," +
    "  order_time," +
    "  DATE_FORMAT(order_time, 'yyyy-MM-dd') as partition_date " +
    "FROM kafka_orders"
);

env.execute("Hive Streaming Sink Example");

Types

public interface PartitionComputer<T> {
    /**
     * Generate partition values for a record
     * @param record - Input record to compute partition for
     * @return Partition path string
     */
    String generatePartValues(T record);
}

public interface BucketAssigner<IN, BucketID> {
    /**
     * Assign record to a bucket
     * @param element - Input element
     * @param context - Processing context
     * @return Bucket identifier
     */
    BucketID getBucketId(IN element, Context context);
}

public interface WriterFactory<IN> {
    /**
     * Create writer for given context
     * @param context - Writer initialization context
     * @return Writer instance
     * @throws IOException if creation fails
     */
    Writer<IN> createWriter(WriterInitContext context) throws IOException;
}

public interface Writer<IN> {
    /**
     * Write element to output
     * @param element - Element to write
     * @param context - Processing context
     * @throws IOException if write fails
     */
    void write(IN element, Context context) throws IOException, InterruptedException;
}

public class ObjectIdentifier {
    public ObjectIdentifier(String catalogName, String databaseName, String objectName);
    public String getCatalogName();
    public String getDatabaseName(); 
    public String getObjectName();
}

public interface SinkRuntimeProvider extends DynamicTableSink.RuntimeProvider {
    // Marker interface for sink providers
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-sql-connector-hive-2-3-6-2-11

docs

catalog-operations.md

configuration.md

hive-functions.md

index.md

source-api.md

table-sinks.md

table-sources.md

tile.json