0
# Bulk Writing
1
2
The ORC format provides high-performance bulk writing capabilities through the `OrcBulkWriterFactory` and custom `Vectorizer` implementations. This enables efficient writing of large datasets to ORC files with full control over the vectorization process.
3
4
## Writer Factory
5
6
```java { .api }
7
@PublicEvolving
8
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
9
public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
10
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration);
11
public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration);
12
13
public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
14
15
@VisibleForTesting
16
protected OrcFile.WriterOptions getWriterOptions();
17
}
18
```
19
20
**Factory Method Usage:**
21
22
```java
23
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
24
import org.apache.flink.orc.vector.RowDataVectorizer;
25
import org.apache.hadoop.conf.Configuration;
26
import java.util.Properties;
27
28
// Simple factory with vectorizer only
29
RowDataVectorizer vectorizer = new RowDataVectorizer(schema, fieldTypes);
30
OrcBulkWriterFactory<RowData> simpleFactory = new OrcBulkWriterFactory<>(vectorizer);
31
32
// Factory with Hadoop configuration
33
Configuration hadoopConfig = new Configuration();
34
hadoopConfig.set("fs.defaultFS", "hdfs://namenode:8020");
35
OrcBulkWriterFactory<RowData> configuredFactory = new OrcBulkWriterFactory<>(
36
vectorizer,
37
hadoopConfig
38
);
39
40
// Factory with writer properties and configuration
41
Properties writerProps = new Properties();
42
writerProps.setProperty("orc.compress", "SNAPPY");
43
writerProps.setProperty("orc.stripe.size", "134217728");
44
OrcBulkWriterFactory<RowData> fullFactory = new OrcBulkWriterFactory<>(
45
vectorizer,
46
writerProps,
47
hadoopConfig
48
);
49
```
50
51
## Vectorizer Base Class
52
53
```java { .api }
54
@PublicEvolving
55
public abstract class Vectorizer<T> implements Serializable {
56
public Vectorizer(String schema);
57
58
public TypeDescription getSchema();
59
public void setWriter(Writer writer);
60
public void addUserMetadata(String key, ByteBuffer value);
61
62
public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
63
}
64
```
65
66
## RowData Vectorizer
67
68
```java { .api }
69
public class RowDataVectorizer extends Vectorizer<RowData> {
70
public RowDataVectorizer(String schema, LogicalType[] fieldTypes);
71
72
public void vectorize(RowData row, VectorizedRowBatch batch);
73
}
74
```
75
76
## Usage Examples
77
78
### Basic RowData Writing
79
80
```java
81
import org.apache.flink.orc.writer.OrcBulkWriterFactory;
82
import org.apache.flink.orc.vector.RowDataVectorizer;
83
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
84
import org.apache.flink.table.data.RowData;
85
import org.apache.flink.table.types.logical.*;
86
87
// Define schema and field types
88
LogicalType[] fieldTypes = {
89
new BigIntType(), // id
90
new VarCharType(255), // name
91
new IntType(), // age
92
new DecimalType(10, 2), // salary
93
new BooleanType(), // active
94
new TimestampType(3) // created_at
95
};
96
97
String orcSchema = "struct<" +
98
"id:bigint," +
99
"name:string," +
100
"age:int," +
101
"salary:decimal(10,2)," +
102
"active:boolean," +
103
"created_at:timestamp" +
104
">";
105
106
// Create vectorizer
107
RowDataVectorizer vectorizer = new RowDataVectorizer(orcSchema, fieldTypes);
108
109
// Create writer factory
110
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);
111
112
// Use in streaming sink
113
DataStream<RowData> dataStream = // ... your data stream
114
dataStream.addSink(
115
StreamingFileSink.forBulkFormat(
116
new Path("/path/to/output"),
117
writerFactory
118
).build()
119
);
120
```
121
122
### Writing with Configuration
123
124
```java
125
import org.apache.hadoop.conf.Configuration;
126
import java.util.Properties;
127
128
// Configure ORC writer properties
129
Properties writerProperties = new Properties();
130
writerProperties.setProperty("orc.compress", "SNAPPY");
131
writerProperties.setProperty("orc.stripe.size", "67108864");
132
writerProperties.setProperty("orc.row.index.stride", "10000");
133
134
// Configure Hadoop settings
135
Configuration hadoopConfig = new Configuration();
136
hadoopConfig.set("orc.bloom.filter.columns", "name,id");
137
hadoopConfig.setFloat("orc.bloom.filter.fpp", 0.05f);
138
139
// Create configured writer factory
140
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(
141
vectorizer,
142
writerProperties,
143
hadoopConfig
144
);
145
```
146
147
### Complex Types Writing
148
149
```java
150
// Schema with complex types
151
LogicalType[] complexFieldTypes = {
152
new BigIntType(), // id
153
new ArrayType(new VarCharType(100)), // tags array
154
new MapType(new VarCharType(50), new IntType()), // metrics map
155
new RowType(Arrays.asList( // address struct
156
new RowType.RowField("street", new VarCharType(200)),
157
new RowType.RowField("city", new VarCharType(100)),
158
new RowType.RowField("zip", new VarCharType(10))
159
))
160
};
161
162
String complexOrcSchema = "struct<" +
163
"id:bigint," +
164
"tags:array<string>," +
165
"metrics:map<string,int>," +
166
"address:struct<street:string,city:string,zip:string>" +
167
">";
168
169
RowDataVectorizer complexVectorizer = new RowDataVectorizer(complexOrcSchema, complexFieldTypes);
170
```
171
172
## Custom Vectorizer Implementation
173
174
```java
175
// Custom vectorizer for POJO classes
176
public class UserVectorizer extends Vectorizer<User> {
177
178
public UserVectorizer() {
179
super("struct<id:bigint,name:string,email:string,active:boolean>");
180
}
181
182
@Override
183
public void vectorize(User user, VectorizedRowBatch batch) throws IOException {
184
int rowId = batch.size++;
185
186
// Set ID column
187
((LongColumnVector) batch.cols[0]).vector[rowId] = user.getId();
188
189
// Set name column
190
byte[] nameBytes = user.getName().getBytes(StandardCharsets.UTF_8);
191
((BytesColumnVector) batch.cols[1]).setVal(rowId, nameBytes);
192
193
// Set email column
194
byte[] emailBytes = user.getEmail().getBytes(StandardCharsets.UTF_8);
195
((BytesColumnVector) batch.cols[2]).setVal(rowId, emailBytes);
196
197
// Set active column
198
((LongColumnVector) batch.cols[3]).vector[rowId] = user.isActive() ? 1 : 0;
199
200
// Add custom metadata
201
if (user.hasSpecialAttribute()) {
202
addUserMetadata("special_users", ByteBuffer.wrap("true".getBytes()));
203
}
204
}
205
}
206
207
// Usage
208
UserVectorizer userVectorizer = new UserVectorizer();
209
OrcBulkWriterFactory<User> userWriterFactory = new OrcBulkWriterFactory<>(userVectorizer);
210
```
211
212
## Bulk Writer Implementation
213
214
```java { .api }
215
@Internal
216
public class OrcBulkWriter<T> implements BulkWriter<T> {
217
public void addElement(T element) throws IOException;
218
public void flush() throws IOException;
219
public void finish() throws IOException;
220
}
221
```
222
223
The `OrcBulkWriter` handles the actual writing process:
224
225
- **addElement()**: Vectorizes and buffers elements, writing batches when full
226
- **flush()**: Forces write of any pending batched data
227
- **finish()**: Finalizes and closes the ORC file
228
229
## Performance Considerations
230
231
### Batch Size Optimization
232
233
```java
234
// ORC uses VectorizedRowBatch with default size
235
VectorizedRowBatch batch = schema.createRowBatch(); // Default: 1024 rows
236
VectorizedRowBatch customBatch = schema.createRowBatch(2048); // Custom size
237
```
238
239
### Compression Settings
240
241
```java
242
Properties props = new Properties();
243
props.setProperty("orc.compress", "SNAPPY"); // Fast compression
244
props.setProperty("orc.compress", "ZLIB"); // Better compression ratio
245
props.setProperty("orc.compress", "ZSTD"); // Best compression ratio
246
props.setProperty("orc.compress", "NONE"); // No compression
247
```
248
249
### Stripe Size Configuration
250
251
```java
252
Properties props = new Properties();
253
props.setProperty("orc.stripe.size", "134217728"); // 128MB stripes
254
props.setProperty("orc.row.index.stride", "20000"); // Index every 20k rows
255
```
256
257
## Writer Lifecycle
258
259
1. **Factory Creation**: `OrcBulkWriterFactory` configured with vectorizer and options
260
2. **Writer Instantiation**: `create()` method called with output stream
261
3. **Element Processing**: `addElement()` vectorizes and batches data
262
4. **Batch Writing**: Full batches automatically written to ORC file
263
5. **Finalization**: `finish()` flushes remaining data and closes file
264
265
## Error Handling
266
267
```java
268
try {
269
writerFactory.create(outputStream);
270
} catch (IOException e) {
271
// Handle writer creation errors
272
log.error("Failed to create ORC writer", e);
273
}
274
275
// Vectorizer error handling
276
@Override
277
public void vectorize(MyData data, VectorizedRowBatch batch) throws IOException {
278
try {
279
// Vectorization logic
280
} catch (Exception e) {
281
throw new IOException("Failed to vectorize data: " + data, e);
282
}
283
}
284
```