Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files
—
The ORC format integrates with Flink's Table API through the OrcFileFormatFactory, providing both reading and writing capabilities with the format identifier "orc".
public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
public static final String IDENTIFIER = "orc";
public String factoryIdentifier();
public Set<ConfigOption<?>> requiredOptions();
public Set<ConfigOption<?>> optionalOptions();
public BulkDecodingFormat<RowData> createDecodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions
);
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions
);
}TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
// Simple ORC table
tableEnv.executeSql(
"CREATE TABLE orders (" +
" order_id BIGINT," +
" customer_id INT," +
" product_name STRING," +
" quantity INT," +
" price DECIMAL(10,2)," +
" order_date DATE," +
" created_at TIMESTAMP(3)" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = '/data/orders'," +
" 'format' = 'orc'" +
")"
);// Partitioned table
tableEnv.executeSql(
"CREATE TABLE partitioned_orders (" +
" order_id BIGINT," +
" customer_id INT," +
" product_name STRING," +
" quantity INT," +
" price DECIMAL(10,2)," +
" order_date DATE," +
" created_at TIMESTAMP(3)," +
" year INT," +
" month INT" +
") PARTITIONED BY (year, month) " +
"WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = '/data/partitioned_orders'," +
" 'format' = 'orc'" +
")"
);// Read from ORC table
Table orders = tableEnv.from("orders");
Table highValueOrders = orders
.filter($("price").isGreater(100.0))
.select($("order_id"), $("customer_id"), $("price"));
// Write to ORC table
tableEnv.executeSql(
"INSERT INTO high_value_orders " +
"SELECT order_id, customer_id, price " +
"FROM orders " +
"WHERE price > 100.0"
);The ORC format supports various configuration options through Hadoop Configuration properties:
// Table with ORC-specific options
tableEnv.executeSql(
"CREATE TABLE configured_orders (" +
" order_id BIGINT," +
" data STRING" +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = '/data/configured_orders'," +
" 'format' = 'orc'," +
" 'orc.compress' = 'SNAPPY'," +
" 'orc.stripe.size' = '67108864'," +
" 'orc.row.index.stride' = '10000'" +
")"
);Common ORC configuration properties that can be used with the orc. prefix:
orc.compress: Compression codec (NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD)orc.stripe.size: Target stripe size in bytes (default: 67108864)orc.row.index.stride: Number of rows between index entries (default: 10000)orc.create.index: Whether to create row group indexes (default: true)orc.bloom.filter.columns: Columns for bloom filter creationorc.bloom.filter.fpp: Bloom filter false positive probability@VisibleForTesting
public static class OrcBulkDecodingFormat
implements BulkDecodingFormat<RowData>,
ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,
FileBasedStatisticsReportableInputFormat {
public OrcBulkDecodingFormat(ReadableConfig formatOptions);
public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
DynamicTableSource.Context sourceContext,
DataType producedDataType,
int[][] projections
);
public ChangelogMode getChangelogMode();
public void applyFilters(List<ResolvedExpression> filters);
public TableStats reportStatistics(List<Path> files, DataType producedDataType);
}The format automatically supports column projection, reading only the required columns:
// Only reads order_id and price columns from ORC files
Table result = tableEnv.sqlQuery(
"SELECT order_id, price FROM orders WHERE customer_id = 12345"
);Supported filter predicates are automatically pushed down to the ORC reader:
// Filters are pushed down to ORC level
Table result = tableEnv.sqlQuery(
"SELECT * FROM orders " +
"WHERE order_date >= DATE '2023-01-01' " +
"AND price BETWEEN 50.0 AND 500.0 " +
"AND product_name IS NOT NULL"
);The format can report table statistics from ORC file metadata:
// Statistics are automatically extracted from ORC files
TableStats stats = bulkDecodingFormat.reportStatistics(files, producedDataType);// Using with Hive catalog
HiveCatalog catalog = new HiveCatalog("hive", "default", "/path/to/hive-conf");
tableEnv.registerCatalog("hive", catalog);
tableEnv.useCatalog("hive");
// Create ORC table in Hive catalog
tableEnv.executeSql(
"CREATE TABLE hive.default.orc_table (" +
" id BIGINT," +
" name STRING" +
") STORED AS ORC " +
"LOCATION '/warehouse/orc_table'"
);Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-orc