Apache Flink SQL ORC format connector providing comprehensive support for reading and writing ORC files in Flink's Table API and SQL environments.
—
Complete integration with Flink's SQL engine and Table API, providing declarative ORC file access through DDL statements and programmatic table definitions with full support for partitioned tables, filter pushdown, and vectorized processing.
The main entry point for integrating ORC format with Flink's Table API and SQL engine.
/**
* Factory for creating ORC format in Flink SQL/Table API contexts.
* Provides both reading and writing capabilities with comprehensive configuration options.
*/
public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
/** Format identifier for SQL DDL statements */
public static final String IDENTIFIER = "orc";
/** Returns the unique identifier for this format */
public String factoryIdentifier();
/** Returns the set of required configuration options */
public Set<ConfigOption<?>> requiredOptions();
/** Returns the set of optional configuration options */
public Set<ConfigOption<?>> optionalOptions();
/** Creates a decoding format for reading ORC files */
public BulkDecodingFormat<RowData> createDecodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions
);
/** Creates an encoding format for writing ORC files */
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions
);
}Usage Example:
// SQL DDL for creating ORC table
tEnv.executeSql(
"CREATE TABLE sales_data (" +
" transaction_id BIGINT," +
" user_id BIGINT," +
" product_id BIGINT," +
" amount DECIMAL(10,2)," +
" transaction_time TIMESTAMP(3)," +
" region STRING" +
") PARTITIONED BY (region) " +
"WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'hdfs://namenode:port/path/to/sales'," +
" 'format' = 'orc'," +
" 'orc.compress' = 'snappy'," +
" 'orc.stripe.size' = '64MB'" +
")"
);Specialized input format for reading ORC files with vectorized processing and partition support.
/**
* Abstract base class for ORC input formats providing vectorized reading capabilities
* @param <T> The type of records produced by the format
* @param <BatchT> The type of batch used internally (e.g., VectorizedRowBatch)
* @param <SplitT> The type of input split
*/
public abstract class AbstractOrcFileInputFormat<T, BatchT, SplitT> implements BulkFormat<T, SplitT> {
/**
* Constructor for ORC input format
* @param filePaths Array of file paths to read
* @param schema ORC schema description
* @param selectedFields Indices of fields to read (for projection)
* @param conjunctPredicates List of predicates for pushdown filtering
* @param batchSize Size of vectorized batches
* @param orcConfig ORC-specific configuration
* @param hadoopConfigWrapper Serializable Hadoop configuration
*/
public AbstractOrcFileInputFormat(
Path[] filePaths,
TypeDescription schema,
int[] selectedFields,
List<Predicate> conjunctPredicates,
int batchSize,
Configuration orcConfig,
SerializableHadoopConfigWrapper hadoopConfigWrapper
);
/** Returns true as ORC files support splitting */
public boolean isSplittable();
/** Abstract method to return the produced type information */
public abstract TypeInformation<T> getProducedType();
}
/**
* Concrete ORC input format that produces RowData with columnar processing
*/
public class OrcColumnarRowFileInputFormat<BatchT, SplitT>
extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {
/** Constructor for non-partitioned tables */
public OrcColumnarRowFileInputFormat(
Path[] filePaths,
String[] fieldNames,
LogicalType[] fieldTypes,
int[] selectedFields,
List<Predicate> conjunctPredicates,
int batchSize,
Configuration orcConfig,
SerializableHadoopConfigWrapper hadoopConfigWrapper
);
/**
* Factory method for creating partitioned table format
* @param orcConfig ORC configuration
* @param tableType Row type of the table schema
* @param hadoopConfigWrapper Hadoop configuration
* @param partitionKeys List of partition column names
* @param extractor Partition field extractor
* @param conjunctPredicates Filter predicates
* @param batchSize Vectorized batch size
* @param caseSensitive Whether names are case sensitive
* @return Configured input format for partitioned tables
*/
public static <SplitT extends FileSourceSplit> OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT>
createPartitionedFormat(
Configuration orcConfig,
RowType tableType,
SerializableHadoopConfigWrapper hadoopConfigWrapper,
List<String> partitionKeys,
PartitionFieldExtractor<SplitT> extractor,
List<Predicate> conjunctPredicates,
int batchSize,
boolean caseSensitive
);
/** Returns RowData type information */
public TypeInformation<RowData> getProducedType();
}Usage Example:
// Programmatic table creation with ORC format
TableDescriptor descriptor = TableDescriptor.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("product_id", DataTypes.BIGINT())
.column("purchase_time", DataTypes.TIMESTAMP_LTZ(3))
.column("amount", DataTypes.DECIMAL(10, 2))
.build())
.partitionedBy("region")
.format("orc")
.option("path", "/path/to/orc/data")
.option("orc.compress", "zstd")
.build();
tEnv.createTable("purchases", descriptor);Internal components for reading ORC file splits with vectorized processing.
/**
* Split reader for ORC files that produces RowData from vectorized batches
*/
public class OrcColumnarRowSplitReader<BATCH> extends OrcSplitReader<RowData, BATCH> {
/**
* Constructor for columnar row split reader
* @param orcShim Version compatibility shim
* @param orcConfig ORC configuration
* @param fieldNames Array of field names
* @param fieldTypes Array of logical field types
* @param selectedFields Indices of selected fields for projection
* @param conjunctPredicates Filter predicates for pushdown
* @param batchSize Size of vectorized batches
* @param split File split to read
* @param generator Batch generator for creating column batches
*/
public OrcColumnarRowSplitReader(
OrcShim<BATCH> orcShim,
Configuration orcConfig,
String[] fieldNames,
LogicalType[] fieldTypes,
int[] selectedFields,
List<Predicate> conjunctPredicates,
int batchSize,
SplitT split,
ColumnBatchGenerator<BATCH> generator
);
/** Reads the next record as RowData */
public RowData nextRecord(RowData reuse) throws IOException;
/**
* Interface for generating column batches from different batch types
*/
public interface ColumnBatchGenerator<BATCH> extends Serializable {
VectorizedColumnBatch generate(BATCH batch);
}
}
/**
* Abstract base class for ORC split readers with batch processing capabilities
*/
public abstract class OrcSplitReader<T, BATCH> implements Closeable {
/**
* Constructor for ORC split reader
* @param orcShim Version compatibility shim
* @param orcConfig ORC configuration
* @param split File split to read
* @param batchSize Size of vectorized batches
*/
public OrcSplitReader(
OrcShim<BATCH> orcShim,
Configuration orcConfig,
SplitT split,
int batchSize
);
/** Seek to a specific row number */
public void seekToRow(long rowNumber) throws IOException;
/** Check if the end of input has been reached */
public boolean reachedEnd();
/** Abstract method for reading the next record */
public abstract T nextRecord(T reuse) throws IOException;
/** Close the reader and release resources */
public void close() throws IOException;
}Helper classes for ORC reader creation and type conversions in Table API contexts.
/**
* Utility class for creating ORC readers and performing type conversions
*/
public class OrcSplitReaderUtil {
/**
* Generate a partitioned columnar row reader
* @param orcShim Version compatibility shim
* @param orcConfig ORC configuration
* @param fieldNames Field names in the schema
* @param fieldTypes Logical types of the fields
* @param selectedFields Selected field indices for projection
* @param conjunctPredicates Filter predicates
* @param batchSize Vectorized batch size
* @param split File split to read
* @param partitionKeys Partition column names
* @param defaultPartName Default partition name for null values
* @param extractor Partition field extractor
* @return Configured partitioned reader
*/
public static <SplitT extends FileSourceSplit> OrcColumnarRowSplitReader<VectorizedRowBatch>
genPartColumnarRowReader(
OrcShim<VectorizedRowBatch> orcShim,
Configuration orcConfig,
String[] fieldNames,
LogicalType[] fieldTypes,
int[] selectedFields,
List<Predicate> conjunctPredicates,
int batchSize,
SplitT split,
List<String> partitionKeys,
String defaultPartName,
PartitionFieldExtractor<SplitT> extractor
);
/** Get selected field indices from ORC schema and projection */
public static int[] getSelectedOrcFields(
RowType tableType,
int[] selectedFields,
List<String> partitionKeys
);
/** Filter out partition column names from schema */
public static List<String> getNonPartNames(List<String> fieldNames, List<String> partitionKeys);
/** Convert Flink row type to ORC type with partition support */
public static TypeDescription convertToOrcTypeWithPart(RowType type, List<String> partitionKeys);
/** Convert Flink logical type to ORC TypeDescription */
public static TypeDescription logicalTypeToOrcType(LogicalType type);
}Usage Example:
// Query with partition pruning and filter pushdown
tEnv.executeSql(
"SELECT user_id, SUM(amount) as total_spent " +
"FROM sales_data " +
"WHERE region = 'US' AND transaction_time > TIMESTAMP '2023-01-01 00:00:00' " +
"GROUP BY user_id " +
"HAVING total_spent > 1000"
).print();Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-orc-2-12