Apache Flink SQL ORC format connector providing comprehensive support for reading and writing ORC files in Flink's Table API and SQL environments.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Flink SQL ORC format connector provides comprehensive support for reading and writing ORC (Optimized Row Columnar) files within Flink's Table API and SQL environments. This package is a shaded JAR that bundles the core flink-orc functionality along with all necessary dependencies for seamless ORC file format integration in distributed processing environments.
org.apache.flink:flink-sql-orc_2.12:1.14.6import org.apache.flink.orc.OrcFileFormatFactory;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;For specific functionality:
// SQL/Table API format factory
import org.apache.flink.orc.OrcFileFormatFactory;
// DataStream API writing
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;
// Filter pushdown
import org.apache.flink.orc.OrcFilters;
// Input formats and readers
import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
import org.apache.flink.orc.AbstractOrcFileInputFormat;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// Create table with ORC format
tEnv.executeSql(
"CREATE TABLE orc_table (" +
" user_id BIGINT," +
" item_id BIGINT," +
" category_id BIGINT," +
" behavior STRING," +
" ts TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'file:///path/to/orc/files'," +
" 'format' = 'orc'" +
")"
);
// Query the ORC table
tEnv.executeSql("SELECT user_id, COUNT(*) FROM orc_table GROUP BY user_id").print();import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.core.fs.Path;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.VarCharType;
// Create vectorizer for schema
String schema = "struct<user_id:bigint,item_id:bigint,behavior:varchar(50)>";
RowDataVectorizer vectorizer = new RowDataVectorizer(schema, new LogicalType[]{
new BigIntType(), new BigIntType(), new VarCharType(50)
});
// Create ORC writer factory
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);
// Create file sink
StreamingFileSink<RowData> sink = StreamingFileSink
.forBulkFormat(new Path("file:///path/to/output"), writerFactory)
.build();
// Write data stream to ORC files
DataStream<RowData> dataStream = // ... your data stream
dataStream.addSink(sink);The ORC format connector is built around several key architectural components:
OrcFileFormatFactory provides seamless integration with Flink's SQL/Table API format systemVectorizedRowBatch and column vector abstractionsOrcShim) ensures compatibility across different Hive/ORC versions (2.0.x, 2.1.x, 2.3+)Complete integration with Flink's SQL engine and Table API, providing declarative ORC file access through DDL statements and programmatic table definitions.
public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
public static final String IDENTIFIER = "orc";
public String factoryIdentifier();
public Set<ConfigOption<?>> requiredOptions();
public Set<ConfigOption<?>> optionalOptions();
public BulkDecodingFormat<RowData> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
}Direct integration with Flink's DataStream API for programmatic ORC reading and writing, supporting custom data processing pipelines and streaming applications.
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration writerConfiguration);
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration hadoopConfiguration);
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
}
public abstract class Vectorizer<T> {
public Vectorizer(String schema);
public TypeDescription getSchema();
public void addUserMetadata(String key, ByteBuffer value);
public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
}Comprehensive configuration options for performance tuning, compression settings, and advanced ORC features including custom filters and metadata handling.
public class OrcFilters {
public static Predicate toOrcPredicate(Expression expression);
public abstract static class Predicate implements Serializable { }
public static class Equals extends BinaryPredicate { }
public static class LessThan extends BinaryPredicate { }
public static class LessThanEquals extends BinaryPredicate { }
public static class IsNull extends ColumnPredicate { }
public static class Between extends ColumnPredicate { }
public static class In extends ColumnPredicate { }
}Configuration and Advanced Usage
Core type definitions used throughout the ORC connector:
// Input format for reading ORC files
public abstract class AbstractOrcFileInputFormat<T, BatchT, SplitT> implements BulkFormat<T, SplitT> {
public AbstractOrcFileInputFormat(
Path[] filePaths,
TypeDescription schema,
int[] selectedFields,
List<Predicate> conjunctPredicates,
int batchSize,
Configuration orcConfig,
SerializableHadoopConfigWrapper hadoopConfigWrapper
);
public boolean isSplittable();
public abstract TypeInformation<T> getProducedType();
}
// Concrete implementation for RowData
public class OrcColumnarRowFileInputFormat<BatchT, SplitT> extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {
public static <SplitT extends FileSourceSplit> OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT>
createPartitionedFormat(
Configuration orcConfig,
RowType tableType,
SerializableHadoopConfigWrapper hadoopConfigWrapper,
List<String> partitionKeys,
PartitionFieldExtractor<SplitT> extractor,
List<Predicate> conjunctPredicates,
int batchSize,
boolean caseSensitive
);
}
// Version compatibility interface
public interface OrcShim<BATCH> extends Serializable {
RecordReader createRecordReader(
Configuration conf,
TypeDescription schema,
int[] selectedFields,
List<OrcFilters.Predicate> conjunctPredicates,
org.apache.flink.core.fs.Path path,
long splitStart,
long splitLength
) throws IOException;
OrcVectorizedBatchWrapper<BATCH> createBatchWrapper(TypeDescription schema, int batchSize);
boolean nextBatch(RecordReader reader, BATCH batch) throws IOException;
static OrcShim<VectorizedRowBatch> defaultShim();
static OrcShim<VectorizedRowBatch> createShim(String hiveDependencyVersion);
}