Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Flink ORC format connector provides comprehensive support for reading and writing ORC (Optimized Row Columnar) data files within the Flink ecosystem. This library enables high-performance columnar data processing with advanced features including vectorized reading, predicate pushdown, Table API integration, and comprehensive data type mapping.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc</artifactId>
<version>2.1.0</version>
</dependency>// Table API - main format factory
import org.apache.flink.orc.OrcFileFormatFactory;
// Writer API
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.Vectorizer;
import org.apache.flink.orc.vector.RowDataVectorizer;
// Reader API
import org.apache.flink.orc.OrcColumnarRowInputFormat;
// Filtering
import org.apache.flink.orc.OrcFilters;The ORC format integrates seamlessly with Flink's Table API through the format identifier "orc":
// Create table with ORC format
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
tableEnv.executeSql(
"CREATE TABLE users (" +
" id BIGINT," +
" name STRING," +
" age INT," +
" active BOOLEAN" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = '/path/to/orc/files'," +
" 'format' = 'orc'" +
")"
);
// Query ORC data
Table result = tableEnv.sqlQuery("SELECT * FROM users WHERE active = true");import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
// Create vectorizer for RowData
LogicalType[] fieldTypes = {
new BigIntType(),
new VarCharType(255),
new IntType(),
new BooleanType()
};
RowDataVectorizer vectorizer = new RowDataVectorizer(
"struct<id:bigint,name:string,age:int,active:boolean>",
fieldTypes
);
// Create ORC writer factory
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);
// Use in sink
DataStream<RowData> dataStream = // ... your data stream
dataStream.addSink(
StreamingFileSink.forBulkFormat(
new Path("/path/to/output"),
writerFactory
).build()
);The Flink ORC connector is organized into several key components:
OrcFileFormatFactory)OrcColumnarRowInputFormat)OrcBulkWriterFactory, Vectorizer)OrcFilters)Main integration point for ORC format in Flink's Table API and SQL.
public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
public static final String IDENTIFIER = "orc";
public String factoryIdentifier();
public BulkDecodingFormat<RowData> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
}High-performance bulk writing of data to ORC files with custom vectorization.
@PublicEvolving
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration);
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration);
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}@PublicEvolving
public abstract class Vectorizer<T> implements Serializable {
public Vectorizer(String schema);
public TypeDescription getSchema();
public void addUserMetadata(String key, ByteBuffer value);
public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
}Vectorized columnar reading with partition support and statistics reporting.
public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT>
implements FileBasedStatisticsReportableInputFormat {
public static <SplitT extends FileSourceSplit> OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT>
createPartitionedFormat(
OrcShim<VectorizedRowBatch> shim,
Configuration hadoopConfig,
RowType tableType,
List<String> partitionKeys,
PartitionFieldExtractor<SplitT> extractor,
int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates,
int batchSize,
Function<RowType, TypeInformation<RowData>> rowTypeInfoFactory
);
public TableStats reportStatistics(List<Path> files, DataType producedDataType);
}Advanced filtering capabilities with ORC-native predicate pushdown.
public class OrcFilters {
public static Predicate toOrcPredicate(Expression expression);
public abstract static class Predicate implements Serializable {
public abstract SearchArgument.Builder add(SearchArgument.Builder builder);
}
}Low-level column vector system for high-performance data processing.
public abstract class AbstractOrcColumnVector {
public static ColumnVector createFlinkVector(org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector, LogicalType type);
public static ColumnVector createFlinkVectorFromConstant(LogicalType type, Object value, int batchSize);
}// Vectorizer for RowData
public class RowDataVectorizer extends Vectorizer<RowData> {
public RowDataVectorizer(String schema, LogicalType[] fieldTypes);
public void vectorize(RowData row, VectorizedRowBatch batch);
}
// Bulk writer implementation
@Internal
public class OrcBulkWriter<T> implements BulkWriter<T> {
public void addElement(T element) throws IOException;
public void flush() throws IOException;
public void finish() throws IOException;
}
// Statistics reporting utility
public class OrcFormatStatisticsReportUtil {
public static TableStats getTableStatistics(List<Path> files, DataType producedDataType);
public static TableStats getTableStatistics(List<Path> files, DataType producedDataType, Configuration hadoopConfig);
}
// Configuration wrapper for serialization
public class SerializableHadoopConfigWrapper implements Serializable {
public SerializableHadoopConfigWrapper(Configuration configuration);
public Configuration get();
}The connector supports comprehensive type mapping between Flink and ORC:
BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLECHAR, VARCHARBINARY, VARBINARYDECIMAL with precision and scaleDATE, TIMESTAMP_WITHOUT_TIME_ZONE, TIMESTAMP_WITH_LOCAL_TIME_ZONEARRAY, MAP, ROW (nested structures)