Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files
npx @tessl/cli install tessl/maven-org-apache-flink--flink-orc@2.1.00
# Apache Flink ORC Format Connector
1
2
Apache Flink ORC format connector provides comprehensive support for reading and writing ORC (Optimized Row Columnar) data files within the Flink ecosystem. This library enables high-performance columnar data processing with advanced features including vectorized reading, predicate pushdown, Table API integration, and comprehensive data type mapping.
3
4
## Package Information
5
6
- **Package Name**: flink-orc
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-orc
11
- **Version**: 2.1.0
12
- **Installation**: Add dependency to your `pom.xml`:
13
14
```xml
15
<dependency>
16
<groupId>org.apache.flink</groupId>
17
<artifactId>flink-orc</artifactId>
18
<version>2.1.0</version>
19
</dependency>
20
```
21
22
## Core Imports
23
24
```java
25
// Table API - main format factory
26
import org.apache.flink.orc.OrcFileFormatFactory;
27
28
// Writer API
29
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
30
import org.apache.flink.orc.vector.Vectorizer;
31
import org.apache.flink.orc.vector.RowDataVectorizer;
32
33
// Reader API
34
import org.apache.flink.orc.OrcColumnarRowInputFormat;
35
36
// Filtering
37
import org.apache.flink.orc.OrcFilters;
38
```
39
40
## Basic Usage
41
42
### Table API Integration
43
44
The ORC format integrates seamlessly with Flink's Table API through the format identifier `"orc"`:
45
46
```java
47
// Create table with ORC format
48
TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
49
50
tableEnv.executeSql(
51
"CREATE TABLE users (" +
52
" id BIGINT," +
53
" name STRING," +
54
" age INT," +
55
" active BOOLEAN" +
56
") WITH (" +
57
" 'connector' = 'filesystem'," +
58
" 'path' = '/path/to/orc/files'," +
59
" 'format' = 'orc'" +
60
")"
61
);
62
63
// Query ORC data
64
Table result = tableEnv.sqlQuery("SELECT * FROM users WHERE active = true");
65
```
66
67
### DataStream API - Writing ORC Files
68
69
```java
70
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
71
import org.apache.flink.orc.vector.RowDataVectorizer;
72
import org.apache.flink.streaming.api.datastream.DataStream;
73
import org.apache.flink.table.data.RowData;
74
75
// Create vectorizer for RowData
76
LogicalType[] fieldTypes = {
77
new BigIntType(),
78
new VarCharType(255),
79
new IntType(),
80
new BooleanType()
81
};
82
83
RowDataVectorizer vectorizer = new RowDataVectorizer(
84
"struct<id:bigint,name:string,age:int,active:boolean>",
85
fieldTypes
86
);
87
88
// Create ORC writer factory
89
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);
90
91
// Use in sink
92
DataStream<RowData> dataStream = // ... your data stream
93
dataStream.addSink(
94
StreamingFileSink.forBulkFormat(
95
new Path("/path/to/output"),
96
writerFactory
97
).build()
98
);
99
```
100
101
## Architecture
102
103
The Flink ORC connector is organized into several key components:
104
105
- **Format Factory**: Central integration point with Flink's Table API (`OrcFileFormatFactory`)
106
- **Reader Layer**: Vectorized columnar reading with predicate pushdown (`OrcColumnarRowInputFormat`)
107
- **Writer Layer**: Bulk writing with custom vectorization support (`OrcBulkWriterFactory`, `Vectorizer`)
108
- **Vector System**: Column vector implementations for all supported data types
109
- **Filter System**: Predicate pushdown with ORC-native filtering (`OrcFilters`)
110
- **Type System**: Complete mapping between Flink and ORC type systems
111
- **Utility Layer**: Statistics, configuration, and compatibility utilities
112
113
## Capabilities
114
115
### Table API Integration
116
117
Main integration point for ORC format in Flink's Table API and SQL.
118
119
```java { .api }
120
public class OrcFileFormatFactory implements BulkReaderFormatFactory, BulkWriterFormatFactory {
121
public static final String IDENTIFIER = "orc";
122
123
public String factoryIdentifier();
124
public BulkDecodingFormat<RowData> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
125
public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions);
126
}
127
```
128
129
[Table API Integration](./table-api.md)
130
131
### Bulk Writing
132
133
High-performance bulk writing of data to ORC files with custom vectorization.
134
135
```java { .api }
136
@PublicEvolving
137
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
138
public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
139
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration);
140
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration);
141
142
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
143
}
144
```
145
146
```java { .api }
147
@PublicEvolving
148
public abstract class Vectorizer<T> implements Serializable {
149
public Vectorizer(String schema);
150
public TypeDescription getSchema();
151
public void addUserMetadata(String key, ByteBuffer value);
152
public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
153
}
154
```
155
156
[Bulk Writing](./bulk-writing.md)
157
158
### Columnar Reading
159
160
Vectorized columnar reading with partition support and statistics reporting.
161
162
```java { .api }
163
public class OrcColumnarRowInputFormat<BatchT, SplitT extends FileSourceSplit>
164
extends AbstractOrcFileInputFormat<RowData, BatchT, SplitT>
165
implements FileBasedStatisticsReportableInputFormat {
166
167
public static <SplitT extends FileSourceSplit> OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT>
168
createPartitionedFormat(
169
OrcShim<VectorizedRowBatch> shim,
170
Configuration hadoopConfig,
171
RowType tableType,
172
List<String> partitionKeys,
173
PartitionFieldExtractor<SplitT> extractor,
174
int[] selectedFields,
175
List<OrcFilters.Predicate> conjunctPredicates,
176
int batchSize,
177
Function<RowType, TypeInformation<RowData>> rowTypeInfoFactory
178
);
179
180
public TableStats reportStatistics(List<Path> files, DataType producedDataType);
181
}
182
```
183
184
[Columnar Reading](./columnar-reading.md)
185
186
### Predicate Pushdown
187
188
Advanced filtering capabilities with ORC-native predicate pushdown.
189
190
```java { .api }
191
public class OrcFilters {
192
public static Predicate toOrcPredicate(Expression expression);
193
194
public abstract static class Predicate implements Serializable {
195
public abstract SearchArgument.Builder add(SearchArgument.Builder builder);
196
}
197
}
198
```
199
200
[Predicate Pushdown](./predicate-pushdown.md)
201
202
### Vector Processing
203
204
Low-level column vector system for high-performance data processing.
205
206
```java { .api }
207
public abstract class AbstractOrcColumnVector {
208
public static ColumnVector createFlinkVector(org.apache.hadoop.hive.ql.exec.vector.ColumnVector orcVector, LogicalType type);
209
public static ColumnVector createFlinkVectorFromConstant(LogicalType type, Object value, int batchSize);
210
}
211
```
212
213
[Vector Processing](./vector-processing.md)
214
215
## Types
216
217
### Core Types
218
219
```java { .api }
220
// Vectorizer for RowData
221
public class RowDataVectorizer extends Vectorizer<RowData> {
222
public RowDataVectorizer(String schema, LogicalType[] fieldTypes);
223
public void vectorize(RowData row, VectorizedRowBatch batch);
224
}
225
226
// Bulk writer implementation
227
@Internal
228
public class OrcBulkWriter<T> implements BulkWriter<T> {
229
public void addElement(T element) throws IOException;
230
public void flush() throws IOException;
231
public void finish() throws IOException;
232
}
233
234
// Statistics reporting utility
235
public class OrcFormatStatisticsReportUtil {
236
public static TableStats getTableStatistics(List<Path> files, DataType producedDataType);
237
public static TableStats getTableStatistics(List<Path> files, DataType producedDataType, Configuration hadoopConfig);
238
}
239
240
// Configuration wrapper for serialization
241
public class SerializableHadoopConfigWrapper implements Serializable {
242
public SerializableHadoopConfigWrapper(Configuration configuration);
243
public Configuration get();
244
}
245
```
246
247
### Supported Data Types
248
249
The connector supports comprehensive type mapping between Flink and ORC:
250
251
- **Primitive Types**: `BOOLEAN`, `TINYINT`, `SMALLINT`, `INTEGER`, `BIGINT`, `FLOAT`, `DOUBLE`
252
- **String Types**: `CHAR`, `VARCHAR`
253
- **Binary Types**: `BINARY`, `VARBINARY`
254
- **Decimal Types**: `DECIMAL` with precision and scale
255
- **Temporal Types**: `DATE`, `TIMESTAMP_WITHOUT_TIME_ZONE`, `TIMESTAMP_WITH_LOCAL_TIME_ZONE`
256
- **Complex Types**: `ARRAY`, `MAP`, `ROW` (nested structures)
257
- **Null Handling**: Full null value support across all types