or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mddatastream-api.mdindex.mdtable-api.md
tile.json

tessl/maven-org-apache-flink--flink-sql-orc-2-12

Apache Flink SQL ORC format connector providing comprehensive support for reading and writing ORC files in Flink's Table API and SQL environments.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-sql-orc_2.12@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-orc-2-12@1.14.0

index.mddocs/

Apache Flink SQL ORC Format Connector

Apache Flink SQL ORC format connector provides comprehensive support for reading and writing ORC (Optimized Row Columnar) files within Flink's Table API and SQL environments. This package is a shaded JAR that bundles the core flink-orc functionality along with all necessary dependencies for seamless ORC file format integration in distributed processing environments.

Package Information

  • Package Name: flink-sql-orc_2.12
  • Package Type: maven
  • Language: Java
  • Installation: Add to Maven dependencies: org.apache.flink:flink-sql-orc_2.12:1.14.6

Core Imports

import org.apache.flink.orc.OrcFileFormatFactory;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;

For specific functionality:

// SQL/Table API format factory
import org.apache.flink.orc.OrcFileFormatFactory;

// DataStream API writing
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;

// Filter pushdown
import org.apache.flink.orc.OrcFilters;

// Input formats and readers
import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
import org.apache.flink.orc.AbstractOrcFileInputFormat;

Basic Usage

SQL/Table API Integration

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

// Create table with ORC format
tEnv.executeSql(
    "CREATE TABLE orc_table (" +
    "  user_id BIGINT," +
    "  item_id BIGINT," +
    "  category_id BIGINT," +
    "  behavior STRING," +
    "  ts TIMESTAMP(3)" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = 'file:///path/to/orc/files'," +
    "  'format' = 'orc'" +
    ")"
);

// Query the ORC table
tEnv.executeSql("SELECT user_id, COUNT(*) FROM orc_table GROUP BY user_id").print();

DataStream API Writing

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.core.fs.Path;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.VarCharType;

// Create vectorizer for schema
String schema = "struct<user_id:bigint,item_id:bigint,behavior:varchar(50)>";
RowDataVectorizer vectorizer = new RowDataVectorizer(schema, new LogicalType[]{
    new BigIntType(), new BigIntType(), new VarCharType(50)
});

// Create ORC writer factory
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);

// Create file sink
StreamingFileSink<RowData> sink = StreamingFileSink
    .forBulkFormat(new Path("file:///path/to/output"), writerFactory)
    .build();

// Write data stream to ORC files
DataStream<RowData> dataStream = // ... your data stream
dataStream.addSink(sink);

Architecture

The ORC format connector is built around several key architectural components:

  • Format Integration Layer: OrcFileFormatFactory provides seamless integration with Flink's SQL/Table API format system
  • Vectorized Processing: High-performance columnar processing through VectorizedRowBatch and column vector abstractions
  • Version Compatibility: Shim layer (OrcShim) ensures compatibility across different Hive/ORC versions (2.0.x, 2.1.x, 2.3+)
  • Filter Pushdown: Optimized query performance through predicate pushdown to ORC reader level
  • Partition Support: Native support for partitioned tables and partition pruning
  • Bulk Processing: Efficient batch reading and writing optimized for large-scale data processing

Capabilities

SQL and Table API Integration

Complete integration with Flink's SQL engine and Table API, providing declarative ORC file access through DDL statements and programmatic table definitions.

public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
    public static final String IDENTIFIER = "orc";
    
    public String factoryIdentifier();
    public Set<ConfigOption<?>> requiredOptions();
    public Set<ConfigOption<?>> optionalOptions();
    public BulkDecodingFormat<RowData> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
    public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
}

Table API Integration

DataStream API Integration

Direct integration with Flink's DataStream API for programmatic ORC reading and writing, supporting custom data processing pipelines and streaming applications.

public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
    public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
    public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration writerConfiguration);
    public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration hadoopConfiguration);
    
    public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}

public abstract class Vectorizer<T> {
    public Vectorizer(String schema);
    public TypeDescription getSchema();
    public void addUserMetadata(String key, ByteBuffer value);
    public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
}

DataStream API Integration

Configuration and Advanced Features

Comprehensive configuration options for performance tuning, compression settings, and advanced ORC features including custom filters and metadata handling.

public class OrcFilters {
    public static Predicate toOrcPredicate(Expression expression);
    
    public abstract static class Predicate implements Serializable { }
    public static class Equals extends BinaryPredicate { }
    public static class LessThan extends BinaryPredicate { }
    public static class LessThanEquals extends BinaryPredicate { }
    public static class IsNull extends ColumnPredicate { }
    public static class Between extends ColumnPredicate { }
    public static class In extends ColumnPredicate { }
}

Configuration and Advanced Usage

Types

Core type definitions used throughout the ORC connector:

// Input format for reading ORC files
public abstract class AbstractOrcFileInputFormat<T, BatchT, SplitT> implements BulkFormat<T, SplitT> {
    public AbstractOrcFileInputFormat(
        Path[] filePaths,
        TypeDescription schema,
        int[] selectedFields,
        List<Predicate> conjunctPredicates,
        int batchSize,
        Configuration orcConfig,
        SerializableHadoopConfigWrapper hadoopConfigWrapper
    );
    
    public boolean isSplittable();
    public abstract TypeInformation<T> getProducedType();
}

// Concrete implementation for RowData
public class OrcColumnarRowFileInputFormat<BatchT, SplitT> extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {
    public static <SplitT extends FileSourceSplit> OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT> 
        createPartitionedFormat(
            Configuration orcConfig,
            RowType tableType,
            SerializableHadoopConfigWrapper hadoopConfigWrapper,
            List<String> partitionKeys,
            PartitionFieldExtractor<SplitT> extractor,
            List<Predicate> conjunctPredicates,
            int batchSize,
            boolean caseSensitive
        );
}

// Version compatibility interface
public interface OrcShim<BATCH> extends Serializable {
    RecordReader createRecordReader(
        Configuration conf, 
        TypeDescription schema, 
        int[] selectedFields,
        List<OrcFilters.Predicate> conjunctPredicates,
        org.apache.flink.core.fs.Path path, 
        long splitStart, 
        long splitLength
    ) throws IOException;
    OrcVectorizedBatchWrapper<BATCH> createBatchWrapper(TypeDescription schema, int batchSize);
    boolean nextBatch(RecordReader reader, BATCH batch) throws IOException;
    
    static OrcShim<VectorizedRowBatch> defaultShim();
    static OrcShim<VectorizedRowBatch> createShim(String hiveDependencyVersion);
}