Apache Flink SQL ORC format connector providing comprehensive support for reading and writing ORC files in Flink's Table API and SQL environments.
npx @tessl/cli install tessl/maven-org-apache-flink--flink-sql-orc-2-12@1.14.0Apache Flink SQL ORC format connector provides comprehensive support for reading and writing ORC (Optimized Row Columnar) files within Flink's Table API and SQL environments. This package is a shaded JAR that bundles the core flink-orc functionality along with all necessary dependencies for seamless ORC file format integration in distributed processing environments.
org.apache.flink:flink-sql-orc_2.12:1.14.6import org.apache.flink.orc.OrcFileFormatFactory;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;For specific functionality:
// SQL/Table API format factory
import org.apache.flink.orc.OrcFileFormatFactory;
// DataStream API writing
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;
// Filter pushdown
import org.apache.flink.orc.OrcFilters;
// Input formats and readers
import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
import org.apache.flink.orc.AbstractOrcFileInputFormat;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Create table with ORC format
tEnv.executeSql(
"CREATE TABLE orc_table (" +
" user_id BIGINT," +
" item_id BIGINT," +
" category_id BIGINT," +
" behavior STRING," +
" ts TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'file:///path/to/orc/files'," +
" 'format' = 'orc'" +
")"
);
// Query the ORC table
tEnv.executeSql("SELECT user_id, COUNT(*) FROM orc_table GROUP BY user_id").print();import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.core.fs.Path;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.VarCharType;
// Create vectorizer for schema
String schema = "struct<user_id:bigint,item_id:bigint,behavior:varchar(50)>";
RowDataVectorizer vectorizer = new RowDataVectorizer(schema, new LogicalType[]{
new BigIntType(), new BigIntType(), new VarCharType(50)
});
// Create ORC writer factory
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);
// Create file sink
StreamingFileSink<RowData> sink = StreamingFileSink
.forBulkFormat(new Path("file:///path/to/output"), writerFactory)
.build();
// Write data stream to ORC files
DataStream<RowData> dataStream = // ... your data stream
dataStream.addSink(sink);The ORC format connector is built around several key architectural components:
OrcFileFormatFactory provides seamless integration with Flink's SQL/Table API format systemVectorizedRowBatch and column vector abstractionsOrcShim) ensures compatibility across different Hive/ORC versions (2.0.x, 2.1.x, 2.3+)Complete integration with Flink's SQL engine and Table API, providing declarative ORC file access through DDL statements and programmatic table definitions.
public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
public static final String IDENTIFIER = "orc";
public String factoryIdentifier();
public Set<ConfigOption<?>> requiredOptions();
public Set<ConfigOption<?>> optionalOptions();
public BulkDecodingFormat<RowData> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
}Direct integration with Flink's DataStream API for programmatic ORC reading and writing, supporting custom data processing pipelines and streaming applications.
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration writerConfiguration);
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration hadoopConfiguration);
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}
public abstract class Vectorizer<T> {
public Vectorizer(String schema);
public TypeDescription getSchema();
public void addUserMetadata(String key, ByteBuffer value);
public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
}Comprehensive configuration options for performance tuning, compression settings, and advanced ORC features including custom filters and metadata handling.
public class OrcFilters {
public static Predicate toOrcPredicate(Expression expression);
public abstract static class Predicate implements Serializable { }
public static class Equals extends BinaryPredicate { }
public static class LessThan extends BinaryPredicate { }
public static class LessThanEquals extends BinaryPredicate { }
public static class IsNull extends ColumnPredicate { }
public static class Between extends ColumnPredicate { }
public static class In extends ColumnPredicate { }
}Configuration and Advanced Usage
Core type definitions used throughout the ORC connector:
// Input format for reading ORC files
public abstract class AbstractOrcFileInputFormat<T, BatchT, SplitT> implements BulkFormat<T, SplitT> {
public AbstractOrcFileInputFormat(
Path[] filePaths,
TypeDescription schema,
int[] selectedFields,
List<Predicate> conjunctPredicates,
int batchSize,
Configuration orcConfig,
SerializableHadoopConfigWrapper hadoopConfigWrapper
);
public boolean isSplittable();
public abstract TypeInformation<T> getProducedType();
}
// Concrete implementation for RowData
public class OrcColumnarRowFileInputFormat<BatchT, SplitT> extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {
public static <SplitT extends FileSourceSplit> OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT>
createPartitionedFormat(
Configuration orcConfig,
RowType tableType,
SerializableHadoopConfigWrapper hadoopConfigWrapper,
List<String> partitionKeys,
PartitionFieldExtractor<SplitT> extractor,
List<Predicate> conjunctPredicates,
int batchSize,
boolean caseSensitive
);
}
// Version compatibility interface
public interface OrcShim<BATCH> extends Serializable {
RecordReader createRecordReader(
Configuration conf,
TypeDescription schema,
int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates,
org.apache.flink.core.fs.Path path,
long splitStart,
long splitLength
) throws IOException;
OrcVectorizedBatchWrapper<BATCH> createBatchWrapper(TypeDescription schema, int batchSize);
boolean nextBatch(RecordReader reader, BATCH batch) throws IOException;
static OrcShim<VectorizedRowBatch> defaultShim();
static OrcShim<VectorizedRowBatch> createShim(String hiveDependencyVersion);
}