0
# Table API Integration
1
2
Complete integration with Flink's SQL engine and Table API, providing declarative ORC file access through DDL statements and programmatic table definitions with full support for partitioned tables, filter pushdown, and vectorized processing.
3
4
## Capabilities
5
6
### Format Factory
7
8
The main entry point for integrating ORC format with Flink's Table API and SQL engine.
9
10
```java { .api }
11
/**
12
* Factory for creating ORC format in Flink SQL/Table API contexts.
13
* Provides both reading and writing capabilities with comprehensive configuration options.
14
*/
15
public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
16
/** Format identifier for SQL DDL statements */
17
public static final String IDENTIFIER = "orc";
18
19
/** Returns the unique identifier for this format */
20
public String factoryIdentifier();
21
22
/** Returns the set of required configuration options */
23
public Set<ConfigOption<?>> requiredOptions();
24
25
/** Returns the set of optional configuration options */
26
public Set<ConfigOption<?>> optionalOptions();
27
28
/** Creates a decoding format for reading ORC files */
29
public BulkDecodingFormat<RowData> createDecodingFormat(
30
DynamicTableFactory.Context context,
31
ReadableConfig formatOptions
32
);
33
34
/** Creates an encoding format for writing ORC files */
35
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
36
DynamicTableFactory.Context context,
37
ReadableConfig formatOptions
38
);
39
}
40
```
41
42
**Usage Example:**
43
44
```java
45
// SQL DDL for creating ORC table
46
tEnv.executeSql(
47
"CREATE TABLE sales_data (" +
48
" transaction_id BIGINT," +
49
" user_id BIGINT," +
50
" product_id BIGINT," +
51
" amount DECIMAL(10,2)," +
52
" transaction_time TIMESTAMP(3)," +
53
" region STRING" +
54
") PARTITIONED BY (region) " +
55
"WITH (" +
56
" 'connector' = 'filesystem'," +
57
" 'path' = 'hdfs://namenode:port/path/to/sales'," +
58
" 'format' = 'orc'," +
59
" 'orc.compress' = 'snappy'," +
60
" 'orc.stripe.size' = '64MB'" +
61
")"
62
);
63
```
64
65
### Input Format for Bulk Reading
66
67
Specialized input format for reading ORC files with vectorized processing and partition support.
68
69
```java { .api }
70
/**
71
* Abstract base class for ORC input formats providing vectorized reading capabilities
72
* @param <T> The type of records produced by the format
73
* @param <BatchT> The type of batch used internally (e.g., VectorizedRowBatch)
74
* @param <SplitT> The type of input split
75
*/
76
public abstract class AbstractOrcFileInputFormat<T, BatchT, SplitT> implements BulkFormat<T, SplitT> {
77
/**
78
* Constructor for ORC input format
79
* @param filePaths Array of file paths to read
80
* @param schema ORC schema description
81
* @param selectedFields Indices of fields to read (for projection)
82
* @param conjunctPredicates List of predicates for pushdown filtering
83
* @param batchSize Size of vectorized batches
84
* @param orcConfig ORC-specific configuration
85
* @param hadoopConfigWrapper Serializable Hadoop configuration
86
*/
87
public AbstractOrcFileInputFormat(
88
Path[] filePaths,
89
TypeDescription schema,
90
int[] selectedFields,
91
List<Predicate> conjunctPredicates,
92
int batchSize,
93
Configuration orcConfig,
94
SerializableHadoopConfigWrapper hadoopConfigWrapper
95
);
96
97
/** Returns true as ORC files support splitting */
98
public boolean isSplittable();
99
100
/** Abstract method to return the produced type information */
101
public abstract TypeInformation<T> getProducedType();
102
}
103
104
/**
105
* Concrete ORC input format that produces RowData with columnar processing
106
*/
107
public class OrcColumnarRowFileInputFormat<BatchT, SplitT>
108
extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {
109
110
/** Constructor for non-partitioned tables */
111
public OrcColumnarRowFileInputFormat(
112
Path[] filePaths,
113
String[] fieldNames,
114
LogicalType[] fieldTypes,
115
int[] selectedFields,
116
List<Predicate> conjunctPredicates,
117
int batchSize,
118
Configuration orcConfig,
119
SerializableHadoopConfigWrapper hadoopConfigWrapper
120
);
121
122
/**
123
* Factory method for creating partitioned table format
124
* @param orcConfig ORC configuration
125
* @param tableType Row type of the table schema
126
* @param hadoopConfigWrapper Hadoop configuration
127
* @param partitionKeys List of partition column names
128
* @param extractor Partition field extractor
129
* @param conjunctPredicates Filter predicates
130
* @param batchSize Vectorized batch size
131
* @param caseSensitive Whether names are case sensitive
132
* @return Configured input format for partitioned tables
133
*/
134
public static <SplitT extends FileSourceSplit> OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT>
135
createPartitionedFormat(
136
Configuration orcConfig,
137
RowType tableType,
138
SerializableHadoopConfigWrapper hadoopConfigWrapper,
139
List<String> partitionKeys,
140
PartitionFieldExtractor<SplitT> extractor,
141
List<Predicate> conjunctPredicates,
142
int batchSize,
143
boolean caseSensitive
144
);
145
146
/** Returns RowData type information */
147
public TypeInformation<RowData> getProducedType();
148
}
149
```
150
151
**Usage Example:**
152
153
```java
154
// Programmatic table creation with ORC format
155
TableDescriptor descriptor = TableDescriptor.forConnector("filesystem")
156
.schema(Schema.newBuilder()
157
.column("user_id", DataTypes.BIGINT())
158
.column("product_id", DataTypes.BIGINT())
159
.column("purchase_time", DataTypes.TIMESTAMP_LTZ(3))
160
.column("amount", DataTypes.DECIMAL(10, 2))
161
.build())
162
.partitionedBy("region")
163
.format("orc")
164
.option("path", "/path/to/orc/data")
165
.option("orc.compress", "zstd")
166
.build();
167
168
tEnv.createTable("purchases", descriptor);
169
```
170
171
### Split Readers
172
173
Internal components for reading ORC file splits with vectorized processing.
174
175
```java { .api }
176
/**
177
* Split reader for ORC files that produces RowData from vectorized batches
178
*/
179
public class OrcColumnarRowSplitReader<BATCH> extends OrcSplitReader<RowData, BATCH> {
180
/**
181
* Constructor for columnar row split reader
182
* @param orcShim Version compatibility shim
183
* @param orcConfig ORC configuration
184
* @param fieldNames Array of field names
185
* @param fieldTypes Array of logical field types
186
* @param selectedFields Indices of selected fields for projection
187
* @param conjunctPredicates Filter predicates for pushdown
188
* @param batchSize Size of vectorized batches
189
* @param split File split to read
190
* @param generator Batch generator for creating column batches
191
*/
192
public OrcColumnarRowSplitReader(
193
OrcShim<BATCH> orcShim,
194
Configuration orcConfig,
195
String[] fieldNames,
196
LogicalType[] fieldTypes,
197
int[] selectedFields,
198
List<Predicate> conjunctPredicates,
199
int batchSize,
200
SplitT split,
201
ColumnBatchGenerator<BATCH> generator
202
);
203
204
/** Reads the next record as RowData */
205
public RowData nextRecord(RowData reuse) throws IOException;
206
207
/**
208
* Interface for generating column batches from different batch types
209
*/
210
public interface ColumnBatchGenerator<BATCH> extends Serializable {
211
VectorizedColumnBatch generate(BATCH batch);
212
}
213
}
214
215
/**
216
* Abstract base class for ORC split readers with batch processing capabilities
217
*/
218
public abstract class OrcSplitReader<T, BATCH> implements Closeable {
219
/**
220
* Constructor for ORC split reader
221
* @param orcShim Version compatibility shim
222
* @param orcConfig ORC configuration
223
* @param split File split to read
224
* @param batchSize Size of vectorized batches
225
*/
226
public OrcSplitReader(
227
OrcShim<BATCH> orcShim,
228
Configuration orcConfig,
229
SplitT split,
230
int batchSize
231
);
232
233
/** Seek to a specific row number */
234
public void seekToRow(long rowNumber) throws IOException;
235
236
/** Check if the end of input has been reached */
237
public boolean reachedEnd();
238
239
/** Abstract method for reading the next record */
240
public abstract T nextRecord(T reuse) throws IOException;
241
242
/** Close the reader and release resources */
243
public void close() throws IOException;
244
}
245
```
246
247
### Utility Classes
248
249
Helper classes for ORC reader creation and type conversions in Table API contexts.
250
251
```java { .api }
252
/**
253
* Utility class for creating ORC readers and performing type conversions
254
*/
255
public class OrcSplitReaderUtil {
256
/**
257
* Generate a partitioned columnar row reader
258
* @param orcShim Version compatibility shim
259
* @param orcConfig ORC configuration
260
* @param fieldNames Field names in the schema
261
* @param fieldTypes Logical types of the fields
262
* @param selectedFields Selected field indices for projection
263
* @param conjunctPredicates Filter predicates
264
* @param batchSize Vectorized batch size
265
* @param split File split to read
266
* @param partitionKeys Partition column names
267
* @param defaultPartName Default partition name for null values
268
* @param extractor Partition field extractor
269
* @return Configured partitioned reader
270
*/
271
public static <SplitT extends FileSourceSplit> OrcColumnarRowSplitReader<VectorizedRowBatch>
272
genPartColumnarRowReader(
273
OrcShim<VectorizedRowBatch> orcShim,
274
Configuration orcConfig,
275
String[] fieldNames,
276
LogicalType[] fieldTypes,
277
int[] selectedFields,
278
List<Predicate> conjunctPredicates,
279
int batchSize,
280
SplitT split,
281
List<String> partitionKeys,
282
String defaultPartName,
283
PartitionFieldExtractor<SplitT> extractor
284
);
285
286
/** Get selected field indices from ORC schema and projection */
287
public static int[] getSelectedOrcFields(
288
RowType tableType,
289
int[] selectedFields,
290
List<String> partitionKeys
291
);
292
293
/** Filter out partition column names from schema */
294
public static List<String> getNonPartNames(List<String> fieldNames, List<String> partitionKeys);
295
296
/** Convert Flink row type to ORC type with partition support */
297
public static TypeDescription convertToOrcTypeWithPart(RowType type, List<String> partitionKeys);
298
299
/** Convert Flink logical type to ORC TypeDescription */
300
public static TypeDescription logicalTypeToOrcType(LogicalType type);
301
}
302
```
303
304
**Usage Example:**
305
306
```java
307
// Query with partition pruning and filter pushdown
308
tEnv.executeSql(
309
"SELECT user_id, SUM(amount) as total_spent " +
310
"FROM sales_data " +
311
"WHERE region = 'US' AND transaction_time > TIMESTAMP '2023-01-01 00:00:00' " +
312
"GROUP BY user_id " +
313
"HAVING total_spent > 1000"
314
).print();
315
```