CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-orc

Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files

Pending
Overview
Eval results
Files

table-api.mddocs/

Table API Integration

The ORC format integrates with Flink's Table API through the OrcFileFormatFactory, providing both reading and writing capabilities with the format identifier "orc".

Format Factory

public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
    public static final String IDENTIFIER = "orc";
    
    public String factoryIdentifier();
    public Set<ConfigOption<?>> requiredOptions();
    public Set<ConfigOption<?>> optionalOptions();
    
    public BulkDecodingFormat<RowData> createDecodingFormat(
        DynamicTableFactory.Context context, 
        ReadableConfig formatOptions
    );
    
    public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
        DynamicTableFactory.Context context, 
        ReadableConfig formatOptions
    );
}

Usage Examples

Creating ORC Tables

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());

// Simple ORC table
tableEnv.executeSql(
    "CREATE TABLE orders (" +
    "  order_id BIGINT," +
    "  customer_id INT," +
    "  product_name STRING," +
    "  quantity INT," +
    "  price DECIMAL(10,2)," +
    "  order_date DATE," +
    "  created_at TIMESTAMP(3)" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = '/data/orders'," +
    "  'format' = 'orc'" +
    ")"
);

Partitioned ORC Tables

// Partitioned table
tableEnv.executeSql(
    "CREATE TABLE partitioned_orders (" +
    "  order_id BIGINT," +
    "  customer_id INT," +
    "  product_name STRING," +
    "  quantity INT," +
    "  price DECIMAL(10,2)," +
    "  order_date DATE," +
    "  created_at TIMESTAMP(3)," +
    "  year INT," +
    "  month INT" +
    ") PARTITIONED BY (year, month) " +
    "WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = '/data/partitioned_orders'," +
    "  'format' = 'orc'" +
    ")"
);

Reading and Writing

// Read from ORC table
Table orders = tableEnv.from("orders");
Table highValueOrders = orders
    .filter($("price").isGreater(100.0))
    .select($("order_id"), $("customer_id"), $("price"));

// Write to ORC table  
tableEnv.executeSql(
    "INSERT INTO high_value_orders " +
    "SELECT order_id, customer_id, price " +
    "FROM orders " +
    "WHERE price > 100.0"
);

Format Options

Configuration Properties

The ORC format supports various configuration options through Hadoop Configuration properties:

// Table with ORC-specific options
tableEnv.executeSql(
    "CREATE TABLE configured_orders (" +
    "  order_id BIGINT," +
    "  data STRING" +
    ") WITH (" +
    "  'connector' = 'filesystem'," +
    "  'path' = '/data/configured_orders'," +
    "  'format' = 'orc'," +
    "  'orc.compress' = 'SNAPPY'," +
    "  'orc.stripe.size' = '67108864'," +
    "  'orc.row.index.stride' = '10000'" +
    ")"
);

Supported ORC Properties

Common ORC configuration properties that can be used with the orc. prefix:

  • orc.compress: Compression codec (NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD)
  • orc.stripe.size: Target stripe size in bytes (default: 67108864)
  • orc.row.index.stride: Number of rows between index entries (default: 10000)
  • orc.create.index: Whether to create row group indexes (default: true)
  • orc.bloom.filter.columns: Columns for bloom filter creation
  • orc.bloom.filter.fpp: Bloom filter false positive probability

Bulk Decoding Format

@VisibleForTesting
public static class OrcBulkDecodingFormat 
        implements BulkDecodingFormat<RowData>, 
                   ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,
                   FileBasedStatisticsReportableInputFormat {
    
    public OrcBulkDecodingFormat(ReadableConfig formatOptions);
    
    public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
        DynamicTableSource.Context sourceContext,
        DataType producedDataType,
        int[][] projections
    );
    
    public ChangelogMode getChangelogMode();
    public void applyFilters(List<ResolvedExpression> filters);
    public TableStats reportStatistics(List<Path> files, DataType producedDataType);
}

Features

Projection Pushdown

The format automatically supports column projection, reading only the required columns:

// Only reads order_id and price columns from ORC files
Table result = tableEnv.sqlQuery(
    "SELECT order_id, price FROM orders WHERE customer_id = 12345"
);

Filter Pushdown

Supported filter predicates are automatically pushed down to the ORC reader:

// Filters are pushed down to ORC level
Table result = tableEnv.sqlQuery(
    "SELECT * FROM orders " +
    "WHERE order_date >= DATE '2023-01-01' " +
    "AND price BETWEEN 50.0 AND 500.0 " +
    "AND product_name IS NOT NULL"
);

Statistics Reporting

The format can report table statistics from ORC file metadata:

// Statistics are automatically extracted from ORC files
TableStats stats = bulkDecodingFormat.reportStatistics(files, producedDataType);

Integration with Catalogs

// Using with Hive catalog
HiveCatalog catalog = new HiveCatalog("hive", "default", "/path/to/hive-conf");
tableEnv.registerCatalog("hive", catalog);
tableEnv.useCatalog("hive");

// Create ORC table in Hive catalog
tableEnv.executeSql(
    "CREATE TABLE hive.default.orc_table (" +
    "  id BIGINT," +
    "  name STRING" +
    ") STORED AS ORC " +
    "LOCATION '/warehouse/orc_table'"
);

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-orc

docs

bulk-writing.md

columnar-reading.md

index.md

predicate-pushdown.md

table-api.md

vector-processing.md

tile.json