0
# Table API Integration
1
2
The ORC format integrates with Flink's Table API through the `OrcFileFormatFactory`, providing both reading and writing capabilities with the format identifier `"orc"`.
3
4
## Format Factory
5
6
```java { .api }
7
public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
8
public static final String IDENTIFIER = "orc";
9
10
public String factoryIdentifier();
11
public Set<ConfigOption<?>> requiredOptions();
12
public Set<ConfigOption<?>> optionalOptions();
13
14
public BulkDecodingFormat<RowData> createDecodingFormat(
15
DynamicTableFactory.Context context,
16
ReadableConfig formatOptions
17
);
18
19
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
20
DynamicTableFactory.Context context,
21
ReadableConfig formatOptions
22
);
23
}
24
```
25
26
## Usage Examples
27
28
### Creating ORC Tables
29
30
```java
31
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
32
33
// Simple ORC table
34
tableEnv.executeSql(
35
"CREATE TABLE orders (" +
36
" order_id BIGINT," +
37
" customer_id INT," +
38
" product_name STRING," +
39
" quantity INT," +
40
" price DECIMAL(10,2)," +
41
" order_date DATE," +
42
" created_at TIMESTAMP(3)" +
43
") WITH (" +
44
" 'connector' = 'filesystem'," +
45
" 'path' = '/data/orders'," +
46
" 'format' = 'orc'" +
47
")"
48
);
49
```
50
51
### Partitioned ORC Tables
52
53
```java
54
// Partitioned table
55
tableEnv.executeSql(
56
"CREATE TABLE partitioned_orders (" +
57
" order_id BIGINT," +
58
" customer_id INT," +
59
" product_name STRING," +
60
" quantity INT," +
61
" price DECIMAL(10,2)," +
62
" order_date DATE," +
63
" created_at TIMESTAMP(3)," +
64
" year INT," +
65
" month INT" +
66
") PARTITIONED BY (year, month) " +
67
"WITH (" +
68
" 'connector' = 'filesystem'," +
69
" 'path' = '/data/partitioned_orders'," +
70
" 'format' = 'orc'" +
71
")"
72
);
73
```
74
75
### Reading and Writing
76
77
```java
78
// Read from ORC table
79
Table orders = tableEnv.from("orders");
80
Table highValueOrders = orders
81
.filter($("price").isGreater(100.0))
82
.select($("order_id"), $("customer_id"), $("price"));
83
84
// Write to ORC table
85
tableEnv.executeSql(
86
"INSERT INTO high_value_orders " +
87
"SELECT order_id, customer_id, price " +
88
"FROM orders " +
89
"WHERE price > 100.0"
90
);
91
```
92
93
## Format Options
94
95
### Configuration Properties
96
97
The ORC format supports various configuration options through Hadoop Configuration properties:
98
99
```java
100
// Table with ORC-specific options
101
tableEnv.executeSql(
102
"CREATE TABLE configured_orders (" +
103
" order_id BIGINT," +
104
" data STRING" +
105
") WITH (" +
106
" 'connector' = 'filesystem'," +
107
" 'path' = '/data/configured_orders'," +
108
" 'format' = 'orc'," +
109
" 'orc.compress' = 'SNAPPY'," +
110
" 'orc.stripe.size' = '67108864'," +
111
" 'orc.row.index.stride' = '10000'" +
112
")"
113
);
114
```
115
116
### Supported ORC Properties
117
118
Common ORC configuration properties that can be used with the `orc.` prefix:
119
120
- `orc.compress`: Compression codec (`NONE`, `ZLIB`, `SNAPPY`, `LZO`, `LZ4`, `ZSTD`)
121
- `orc.stripe.size`: Target stripe size in bytes (default: 67108864)
122
- `orc.row.index.stride`: Number of rows between index entries (default: 10000)
123
- `orc.create.index`: Whether to create row group indexes (default: true)
124
- `orc.bloom.filter.columns`: Columns for bloom filter creation
125
- `orc.bloom.filter.fpp`: Bloom filter false positive probability
126
127
## Bulk Decoding Format
128
129
```java { .api }
130
@VisibleForTesting
131
public static class OrcBulkDecodingFormat
132
implements BulkDecodingFormat<RowData>,
133
ProjectableDecodingFormat<BulkFormat<RowData, FileSourceSplit>>,
134
FileBasedStatisticsReportableInputFormat {
135
136
public OrcBulkDecodingFormat(ReadableConfig formatOptions);
137
138
public BulkFormat<RowData, FileSourceSplit> createRuntimeDecoder(
139
DynamicTableSource.Context sourceContext,
140
DataType producedDataType,
141
int[][] projections
142
);
143
144
public ChangelogMode getChangelogMode();
145
public void applyFilters(List<ResolvedExpression> filters);
146
public TableStats reportStatistics(List<Path> files, DataType producedDataType);
147
}
148
```
149
150
## Features
151
152
### Projection Pushdown
153
154
The format automatically supports column projection, reading only the required columns:
155
156
```java
157
// Only reads order_id and price columns from ORC files
158
Table result = tableEnv.sqlQuery(
159
"SELECT order_id, price FROM orders WHERE customer_id = 12345"
160
);
161
```
162
163
### Filter Pushdown
164
165
Supported filter predicates are automatically pushed down to the ORC reader:
166
167
```java
168
// Filters are pushed down to ORC level
169
Table result = tableEnv.sqlQuery(
170
"SELECT * FROM orders " +
171
"WHERE order_date >= DATE '2023-01-01' " +
172
"AND price BETWEEN 50.0 AND 500.0 " +
173
"AND product_name IS NOT NULL"
174
);
175
```
176
177
### Statistics Reporting
178
179
The format can report table statistics from ORC file metadata:
180
181
```java
182
// Statistics are automatically extracted from ORC files
183
TableStats stats = bulkDecodingFormat.reportStatistics(files, producedDataType);
184
```
185
186
## Integration with Catalogs
187
188
```java
189
// Using with Hive catalog
190
HiveCatalog catalog = new HiveCatalog("hive", "default", "/path/to/hive-conf");
191
tableEnv.registerCatalog("hive", catalog);
192
tableEnv.useCatalog("hive");
193
194
// Create ORC table in Hive catalog
195
tableEnv.executeSql(
196
"CREATE TABLE hive.default.orc_table (" +
197
" id BIGINT," +
198
" name STRING" +
199
") STORED AS ORC " +
200
"LOCATION '/warehouse/orc_table'"
201
);
202
```