or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdpredicate-pushdown.mdtable-api.mdvector-processing.md
tile.json

tessl/maven-org-apache-flink--flink-orc

Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-orc@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-orc@2.1.0

index.mddocs/

Apache Flink ORC Format Connector

Apache Flink ORC format connector provides comprehensive support for reading and writing ORC (Optimized Row Columnar) data files within the Flink ecosystem. This library enables high-performance columnar data processing with advanced features including vectorized reading, predicate pushdown, Table API integration, and comprehensive data type mapping.

Package Information

  • Package Name: flink-orc
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.flink
  • Artifact ID: flink-orc
  • Version: 2.1.0
  • Installation: Add dependency to your pom.xml:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-orc</artifactId>
    <version>2.1.0</version>
</dependency>

Core Imports

// Table API - main format factory
import org.apache.flink.orc.OrcFileFormatFactory;

// Writer API
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.Vectorizer;
import org.apache.flink.orc.vector.RowDataVectorizer;

// Reader API
import org.apache.flink.orc.OrcColumnarRowInputFormat;

// Filtering
import org.apache.flink.orc.OrcFilters;

Basic Usage

Table API Integration

The ORC format integrates seamlessly with Flink's Table API through the format identifier "orc":

// Create table with ORC format
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

tableEnv.executeSql(
    "CREATE TABLE users (" +
    "  id BIGINT," +
    "  name STRING," +
    "  age INT," +
    "  active BOOLEAN" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = '/path/to/orc/files'," +
    "  'format' = 'orc'" +
    ")"
);

// Query ORC data
Table result = tableEnv.sqlQuery("SELECT * FROM users WHERE active = true");

DataStream API - Writing ORC Files

import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;

// Create vectorizer for RowData
LogicalType[] fieldTypes = {
    new BigIntType(),
    new VarCharType(255),
    new IntType(),
    new BooleanType()
};

RowDataVectorizer vectorizer = new RowDataVectorizer(
    "struct<id:bigint,name:string,age:int,active:boolean>",
    fieldTypes
);

// Create ORC writer factory
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);

// Use in sink
DataStream<RowData> dataStream = // ... your data stream
dataStream.addSink(
    StreamingFileSink.forBulkFormat(
        new Path("/path/to/output"),
        writerFactory
    ).build()
);

Architecture

The Flink ORC connector is organized into several key components:

  • Format Factory: Central integration point with Flink's Table API (OrcFileFormatFactory)
  • Reader Layer: Vectorized columnar reading with predicate pushdown (OrcColumnarRowInputFormat)
  • Writer Layer: Bulk writing with custom vectorization support (OrcBulkWriterFactory, Vectorizer)
  • Vector System: Column vector implementations for all supported data types
  • Filter System: Predicate pushdown with ORC-native filtering (OrcFilters)
  • Type System: Complete mapping between Flink and ORC type systems
  • Utility Layer: Statistics, configuration, and compatibility utilities

Capabilities

Table API Integration

Main integration point for ORC format in Flink's Table API and SQL.

public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
    public static final String IDENTIFIER = "orc";
    
    public String factoryIdentifier();
    public BulkDecodingFormat<RowData> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
    public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
}

Table API Integration

Bulk Writing

High-performance bulk writing of data to ORC files with custom vectorization.

@PublicEvolving
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
    public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
    public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration);
    public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration);
    
    public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}
@PublicEvolving
public abstract class Vectorizer<T> implements Serializable {
    public Vectorizer(String schema);
    public TypeDescription getSchema();
    public void addUserMetadata(String key, ByteBuffer value);
    public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
}

Bulk Writing

Columnar Reading

Vectorized columnar reading with partition support and statistics reporting.

public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit> 
        extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> 
        implements FileBasedStatisticsReportableInputFormat {
            
    public static <SplitT extends FileSourceSplit> OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> 
        createPartitionedFormat(
            OrcShim<VectorizedRowBatch> shim,
            Configuration hadoopConfig,
            RowType tableType,
            List<String> partitionKeys,
            PartitionFieldExtractor<SplitT> extractor,
            int[] selectedFields,
            List<OrcFilters.Predicate> conjunctPredicates,
            int batchSize,
            Function<RowType, TypeInformation<RowData>> rowTypeInfoFactory
        );
    
    public TableStats reportStatistics(List<Path> files, DataType producedDataType);
}

Columnar Reading

Predicate Pushdown

Advanced filtering capabilities with ORC-native predicate pushdown.

public class OrcFilters {
    public static Predicate toOrcPredicate(Expression expression);
    
    public abstract static class Predicate implements Serializable {
        public abstract SearchArgument.Builder add(SearchArgument.Builder builder);
    }
}

Predicate Pushdown

Vector Processing

Low-level column vector system for high-performance data processing.

public abstract class AbstractOrcColumnVector {
    public static ColumnVector createFlinkVector(org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector, LogicalType type);
    public static ColumnVector createFlinkVectorFromConstant(LogicalType type, Object value, int batchSize);
}

Vector Processing

Types

Core Types

// Vectorizer for RowData
public class RowDataVectorizer extends Vectorizer<RowData> {
    public RowDataVectorizer(String schema, LogicalType[] fieldTypes);
    public void vectorize(RowData row, VectorizedRowBatch batch);
}

// Bulk writer implementation
@Internal
public class OrcBulkWriter<T> implements BulkWriter<T> {
    public void addElement(T element) throws IOException;
    public void flush() throws IOException;
    public void finish() throws IOException;
}

// Statistics reporting utility
public class OrcFormatStatisticsReportUtil {
    public static TableStats getTableStatistics(List<Path> files, DataType producedDataType);
    public static TableStats getTableStatistics(List<Path> files, DataType producedDataType, Configuration hadoopConfig);
}

// Configuration wrapper for serialization
public class SerializableHadoopConfigWrapper implements Serializable {
    public SerializableHadoopConfigWrapper(Configuration configuration);
    public Configuration get();
}

Supported Data Types

The connector supports comprehensive type mapping between Flink and ORC:

  • Primitive Types: BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE
  • String Types: CHAR, VARCHAR
  • Binary Types: BINARY, VARBINARY
  • Decimal Types: DECIMAL with precision and scale
  • Temporal Types: DATE, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONE
  • Complex Types: ARRAY, MAP, ROW (nested structures)
  • Null Handling: Full null value support across all types