Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files
npx @tessl/cli install tessl/maven-org-apache-flink--flink-orc@2.1.0Apache Flink ORC format connector provides comprehensive support for reading and writing ORC (Optimized Row Columnar) data files within the Flink ecosystem. This library enables high-performance columnar data processing with advanced features including vectorized reading, predicate pushdown, Table API integration, and comprehensive data type mapping.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc</artifactId>
<version>2.1.0</version>
</dependency>// Table API - main format factory
import org.apache.flink.orc.OrcFileFormatFactory;
// Writer API
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.Vectorizer;
import org.apache.flink.orc.vector.RowDataVectorizer;
// Reader API
import org.apache.flink.orc.OrcColumnarRowInputFormat;
// Filtering
import org.apache.flink.orc.OrcFilters;The ORC format integrates seamlessly with Flink's Table API through the format identifier "orc":
// Create table with ORC format
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
tableEnv.executeSql(
"CREATE TABLE users (" +
" id BIGINT," +
" name STRING," +
" age INT," +
" active BOOLEAN" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = '/path/to/orc/files'," +
" 'format' = 'orc'" +
")"
);
// Query ORC data
Table result = tableEnv.sqlQuery("SELECT * FROM users WHERE active = true");import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
// Create vectorizer for RowData
LogicalType[] fieldTypes = {
new BigIntType(),
new VarCharType(255),
new IntType(),
new BooleanType()
};
RowDataVectorizer vectorizer = new RowDataVectorizer(
"struct<id:bigint,name:string,age:int,active:boolean>",
fieldTypes
);
// Create ORC writer factory
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);
// Use in sink
DataStream<RowData> dataStream = // ... your data stream
dataStream.addSink(
StreamingFileSink.forBulkFormat(
new Path("/path/to/output"),
writerFactory
).build()
);The Flink ORC connector is organized into several key components:
OrcFileFormatFactory)OrcColumnarRowInputFormat)OrcBulkWriterFactory, Vectorizer)OrcFilters)Main integration point for ORC format in Flink's Table API and SQL.
public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
public static final String IDENTIFIER = "orc";
public String factoryIdentifier();
public BulkDecodingFormat<RowData> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
}High-performance bulk writing of data to ORC files with custom vectorization.
@PublicEvolving
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration);
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration);
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}@PublicEvolving
public abstract class Vectorizer<T> implements Serializable {
public Vectorizer(String schema);
public TypeDescription getSchema();
public void addUserMetadata(String key, ByteBuffer value);
public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
}Vectorized columnar reading with partition support and statistics reporting.
public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT>
implements FileBasedStatisticsReportableInputFormat {
public static <SplitT extends FileSourceSplit> OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT>
createPartitionedFormat(
OrcShim<VectorizedRowBatch> shim,
Configuration hadoopConfig,
RowType tableType,
List<String> partitionKeys,
PartitionFieldExtractor<SplitT> extractor,
int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates,
int batchSize,
Function<RowType, TypeInformation<RowData>> rowTypeInfoFactory
);
public TableStats reportStatistics(List<Path> files, DataType producedDataType);
}Advanced filtering capabilities with ORC-native predicate pushdown.
public class OrcFilters {
public static Predicate toOrcPredicate(Expression expression);
public abstract static class Predicate implements Serializable {
public abstract SearchArgument.Builder add(SearchArgument.Builder builder);
}
}Low-level column vector system for high-performance data processing.
public abstract class AbstractOrcColumnVector {
public static ColumnVector createFlinkVector(org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector, LogicalType type);
public static ColumnVector createFlinkVectorFromConstant(LogicalType type, Object value, int batchSize);
}// Vectorizer for RowData
public class RowDataVectorizer extends Vectorizer<RowData> {
public RowDataVectorizer(String schema, LogicalType[] fieldTypes);
public void vectorize(RowData row, VectorizedRowBatch batch);
}
// Bulk writer implementation
@Internal
public class OrcBulkWriter<T> implements BulkWriter<T> {
public void addElement(T element) throws IOException;
public void flush() throws IOException;
public void finish() throws IOException;
}
// Statistics reporting utility
public class OrcFormatStatisticsReportUtil {
public static TableStats getTableStatistics(List<Path> files, DataType producedDataType);
public static TableStats getTableStatistics(List<Path> files, DataType producedDataType, Configuration hadoopConfig);
}
// Configuration wrapper for serialization
public class SerializableHadoopConfigWrapper implements Serializable {
public SerializableHadoopConfigWrapper(Configuration configuration);
public Configuration get();
}The connector supports comprehensive type mapping between Flink and ORC:
BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLECHAR, VARCHARBINARY, VARBINARYDECIMAL with precision and scaleDATE, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONEARRAY, MAP, ROW (nested structures)