0
# Apache Flink SQL ORC Format Connector
1
2
Apache Flink SQL ORC format connector provides comprehensive support for reading and writing ORC (Optimized Row Columnar) files within Flink's Table API and SQL environments. This package is a shaded JAR that bundles the core flink-orc functionality along with all necessary dependencies for seamless ORC file format integration in distributed processing environments.
3
4
## Package Information
5
6
- **Package Name**: flink-sql-orc_2.12
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to Maven dependencies: `org.apache.flink:flink-sql-orc_2.12:1.14.6`
10
11
## Core Imports
12
13
```java
14
import org.apache.flink.orc.OrcFileFormatFactory;
15
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
16
import org.apache.flink.orc.vector.RowDataVectorizer;
17
```
18
19
For specific functionality:
20
21
```java
22
// SQL/Table API format factory
23
import org.apache.flink.orc.OrcFileFormatFactory;
24
25
// DataStream API writing
26
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
27
import org.apache.flink.orc.vector.RowDataVectorizer;
28
29
// Filter pushdown
30
import org.apache.flink.orc.OrcFilters;
31
32
// Input formats and readers
33
import org.apache.flink.orc.OrcColumnarRowFileInputFormat;
34
import org.apache.flink.orc.AbstractOrcFileInputFormat;
35
```
36
37
## Basic Usage
38
39
### SQL/Table API Integration
40
41
```java
42
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
43
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
44
45
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
46
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
47
48
// Create table with ORC format
49
tEnv.executeSql(
50
"CREATE TABLE orc_table (" +
51
" user_id BIGINT," +
52
" item_id BIGINT," +
53
" category_id BIGINT," +
54
" behavior STRING," +
55
" ts TIMESTAMP(3)" +
56
") WITH (" +
57
" 'connector' = 'filesystem'," +
58
" 'path' = 'file:///path/to/orc/files'," +
59
" 'format' = 'orc'" +
60
")"
61
);
62
63
// Query the ORC table
64
tEnv.executeSql("SELECT user_id, COUNT(*) FROM orc_table GROUP BY user_id").print();
65
```
66
67
### DataStream API Writing
68
69
```java
70
import org.apache.flink.api.common.typeinfo.Types;
71
import org.apache.flink.core.fs.Path;
72
import org.apache.flink.orc.vector.RowDataVectorizer;
73
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
74
import org.apache.flink.streaming.api.datastream.DataStream;
75
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
76
import org.apache.flink.table.data.RowData;
77
import org.apache.flink.table.types.logical.BigIntType;
78
import org.apache.flink.table.types.logical.VarCharType;
79
80
// Create vectorizer for schema
81
String schema = "struct<user_id:bigint,item_id:bigint,behavior:varchar(50)>";
82
RowDataVectorizer vectorizer = new RowDataVectorizer(schema, new LogicalType[]{
83
new BigIntType(), new BigIntType(), new VarCharType(50)
84
});
85
86
// Create ORC writer factory
87
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);
88
89
// Create file sink
90
StreamingFileSink<RowData> sink = StreamingFileSink
91
.forBulkFormat(new Path("file:///path/to/output"), writerFactory)
92
.build();
93
94
// Write data stream to ORC files
95
DataStream<RowData> dataStream = // ... your data stream
96
dataStream.addSink(sink);
97
```
98
99
## Architecture
100
101
The ORC format connector is built around several key architectural components:
102
103
- **Format Integration Layer**: `OrcFileFormatFactory` provides seamless integration with Flink's SQL/Table API format system
104
- **Vectorized Processing**: High-performance columnar processing through `VectorizedRowBatch` and column vector abstractions
105
- **Version Compatibility**: Shim layer (`OrcShim`) ensures compatibility across different Hive/ORC versions (2.0.x, 2.1.x, 2.3+)
106
- **Filter Pushdown**: Optimized query performance through predicate pushdown to ORC reader level
107
- **Partition Support**: Native support for partitioned tables and partition pruning
108
- **Bulk Processing**: Efficient batch reading and writing optimized for large-scale data processing
109
110
## Capabilities
111
112
### SQL and Table API Integration
113
114
Complete integration with Flink's SQL engine and Table API, providing declarative ORC file access through DDL statements and programmatic table definitions.
115
116
```java { .api }
117
public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
118
public static final String IDENTIFIER = "orc";
119
120
public String factoryIdentifier();
121
public Set<ConfigOption<?>> requiredOptions();
122
public Set<ConfigOption<?>> optionalOptions();
123
public BulkDecodingFormat<RowData> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
124
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
125
}
126
```
127
128
[Table API Integration](./table-api.md)
129
130
### DataStream API Integration
131
132
Direct integration with Flink's DataStream API for programmatic ORC reading and writing, supporting custom data processing pipelines and streaming applications.
133
134
```java { .api }
135
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
136
public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
137
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration writerConfiguration);
138
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration hadoopConfiguration);
139
140
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
141
}
142
143
public abstract class Vectorizer<T> {
144
public Vectorizer(String schema);
145
public TypeDescription getSchema();
146
public void addUserMetadata(String key, ByteBuffer value);
147
public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
148
}
149
```
150
151
[DataStream API Integration](./datastream-api.md)
152
153
### Configuration and Advanced Features
154
155
Comprehensive configuration options for performance tuning, compression settings, and advanced ORC features including custom filters and metadata handling.
156
157
```java { .api }
158
public class OrcFilters {
159
public static Predicate toOrcPredicate(Expression expression);
160
161
public abstract static class Predicate implements Serializable { }
162
public static class Equals extends BinaryPredicate { }
163
public static class LessThan extends BinaryPredicate { }
164
public static class LessThanEquals extends BinaryPredicate { }
165
public static class IsNull extends ColumnPredicate { }
166
public static class Between extends ColumnPredicate { }
167
public static class In extends ColumnPredicate { }
168
}
169
```
170
171
[Configuration and Advanced Usage](./configuration.md)
172
173
## Types
174
175
Core type definitions used throughout the ORC connector:
176
177
```java { .api }
178
// Input format for reading ORC files
179
public abstract class AbstractOrcFileInputFormat<T, BatchT, SplitT> implements BulkFormat<T, SplitT> {
180
public AbstractOrcFileInputFormat(
181
Path[] filePaths,
182
TypeDescription schema,
183
int[] selectedFields,
184
List<Predicate> conjunctPredicates,
185
int batchSize,
186
Configuration orcConfig,
187
SerializableHadoopConfigWrapper hadoopConfigWrapper
188
);
189
190
public boolean isSplittable();
191
public abstract TypeInformation<T> getProducedType();
192
}
193
194
// Concrete implementation for RowData
195
public class OrcColumnarRowFileInputFormat<BatchT, SplitT> extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT> {
196
public static <SplitT extends FileSourceSplit> OrcColumnarRowFileInputFormat<VectorizedRowBatch, SplitT>
197
createPartitionedFormat(
198
Configuration orcConfig,
199
RowType tableType,
200
SerializableHadoopConfigWrapper hadoopConfigWrapper,
201
List<String> partitionKeys,
202
PartitionFieldExtractor<SplitT> extractor,
203
List<Predicate> conjunctPredicates,
204
int batchSize,
205
boolean caseSensitive
206
);
207
}
208
209
// Version compatibility interface
210
public interface OrcShim<BATCH> extends Serializable {
211
RecordReader createRecordReader(
212
Configuration conf,
213
TypeDescription schema,
214
int[] selectedFields,
215
List<OrcFilters.Predicate> conjunctPredicates,
216
org.apache.flink.core.fs.Path path,
217
long splitStart,
218
long splitLength
219
) throws IOException;
220
OrcVectorizedBatchWrapper<BATCH> createBatchWrapper(TypeDescription schema, int batchSize);
221
boolean nextBatch(RecordReader reader, BATCH batch) throws IOException;
222
223
static OrcShim<VectorizedRowBatch> defaultShim();
224
static OrcShim<VectorizedRowBatch> createShim(String hiveDependencyVersion);
225
}
226
```