0
# Bulk Writing
1
2
Factory and implementation for creating ORC bulk writers that efficiently write Flink RowData to ORC files without Hive dependencies. Provides high-performance batch writing with automatic type conversion and memory management.
3
4
## Capabilities
5
6
### ORC Bulk Writer Factory
7
8
Factory class that creates BulkWriter instances for writing RowData to ORC files using the no-hive ORC implementation.
9
10
```java { .api }
11
/**
12
* Factory for creating ORC bulk writers without Hive dependencies
13
* Implements BulkWriter.Factory<RowData> for integration with Flink's file sinks
14
*/
15
public class OrcNoHiveBulkWriterFactory implements BulkWriter.Factory<RowData> {
16
17
/**
18
* Creates a new ORC bulk writer factory
19
* @param conf Hadoop configuration for ORC file settings
20
* @param schema ORC schema string (e.g., "struct<name:string,age:int>")
21
* @param fieldTypes Array of Flink logical types matching the schema fields
22
*/
23
public OrcNoHiveBulkWriterFactory(Configuration conf, String schema, LogicalType[] fieldTypes);
24
25
/**
26
* Creates a BulkWriter instance for the given output stream
27
* @param out Output stream to write ORC data to
28
* @return BulkWriter instance for writing RowData
29
* @throws IOException if writer creation fails
30
*/
31
public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException;
32
}
33
```
34
35
**Usage Examples:**
36
37
```java
38
import org.apache.flink.orc.nohive.OrcNoHiveBulkWriterFactory;
39
import org.apache.flink.table.data.RowData;
40
import org.apache.flink.table.types.logical.*;
41
import org.apache.hadoop.conf.Configuration;
42
43
// Define schema and types for a user record
44
String orcSchema = "struct<id:bigint,name:string,email:string,age:int,salary:decimal(10,2)>";
45
LogicalType[] fieldTypes = {
46
new BigIntType(),
47
new VarCharType(255),
48
new VarCharType(255),
49
new IntType(),
50
new DecimalType(10, 2)
51
};
52
53
// Create factory with Hadoop configuration
54
Configuration hadoopConfig = new Configuration();
55
hadoopConfig.set("orc.compress", "ZLIB"); // Optional compression
56
hadoopConfig.setInt("orc.row.batch.size", 1024); // Optional batch size
57
58
OrcNoHiveBulkWriterFactory factory = new OrcNoHiveBulkWriterFactory(
59
hadoopConfig,
60
orcSchema,
61
fieldTypes
62
);
63
64
// Use with StreamingFileSink
65
StreamingFileSink<RowData> sink = StreamingFileSink
66
.forBulkFormat(outputPath, factory)
67
.withRollingPolicy(DefaultRollingPolicy.builder()
68
.withMaxPartSize(128 * 1024 * 1024) // 128MB files
69
.build())
70
.build();
71
72
dataStream.addSink(sink);
73
```
74
75
```java
76
// Complex nested schema example
77
String complexSchema = "struct<" +
78
"user_id:bigint," +
79
"profile:struct<name:string,bio:string>," +
80
"tags:array<string>," +
81
"metrics:map<string,double>" +
82
">";
83
84
LogicalType[] complexTypes = {
85
new BigIntType(),
86
RowType.of(new VarCharType(100), new VarCharType(500)),
87
new ArrayType(new VarCharType(50)),
88
new MapType(new VarCharType(50), new DoubleType())
89
};
90
91
OrcNoHiveBulkWriterFactory complexFactory = new OrcNoHiveBulkWriterFactory(
92
hadoopConfig,
93
complexSchema,
94
complexTypes
95
);
96
```
97
98
### BulkWriter Interface
99
100
The factory creates BulkWriter instances with the following interface:
101
102
```java { .api }
103
/**
104
* BulkWriter interface for writing RowData to ORC files
105
* Created by OrcNoHiveBulkWriterFactory.create()
106
*/
107
interface BulkWriter<RowData> {
108
/**
109
* Add a single RowData element to the ORC file
110
* @param row RowData instance to write
111
* @throws IOException if write operation fails
112
*/
113
void addElement(RowData row) throws IOException;
114
115
/**
116
* Flush any buffered data to the output stream
117
* @throws IOException if flush operation fails
118
*/
119
void flush() throws IOException;
120
121
/**
122
* Finish writing and close the ORC file
123
* @throws IOException if close operation fails
124
*/
125
void finish() throws IOException;
126
}
127
```
128
129
### Physical Writer Implementation
130
131
Internal implementation that handles ORC file writing with relocated Protobuf classes for no-hive compatibility.
132
133
```java { .api }
134
/**
135
* Physical writer implementation for ORC files without Hive dependencies
136
* Handles relocated Protobuf classes in orc-core-nohive
137
*/
138
public class NoHivePhysicalWriterImpl extends PhysicalWriterImpl {
139
140
/**
141
* Creates a new no-hive physical writer
142
* @param out Output stream to write to
143
* @param opts ORC writer options and configuration
144
* @throws IOException if writer initialization fails
145
*/
146
public NoHivePhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts) throws IOException;
147
148
/**
149
* Write ORC metadata using relocated protobuf classes
150
* @param metadata ORC metadata to write
151
* @throws IOException if write operation fails
152
*/
153
protected void writeMetadata(OrcProto.Metadata metadata) throws IOException;
154
155
/**
156
* Write ORC file footer using relocated protobuf classes
157
* @param footer ORC file footer to write
158
* @throws IOException if write operation fails
159
*/
160
protected void writeFileFooter(OrcProto.Footer footer) throws IOException;
161
162
/**
163
* Write ORC stripe footer using relocated protobuf classes
164
* @param footer ORC stripe footer to write
165
* @throws IOException if write operation fails
166
*/
167
protected void writeStripeFooter(OrcProto.StripeFooter footer) throws IOException;
168
}
169
```
170
171
## Type Conversion
172
173
The bulk writer automatically converts Flink logical types to ORC column vectors:
174
175
| Flink Type | ORC Vector Type | Conversion Notes |
176
|------------|----------------|------------------|
177
| BOOLEAN | LongColumnVector | 1 for true, 0 for false |
178
| TINYINT, SMALLINT, INTEGER, BIGINT | LongColumnVector | Direct mapping |
179
| FLOAT, DOUBLE | DoubleColumnVector | Direct mapping |
180
| CHAR, VARCHAR | BytesColumnVector | UTF-8 encoded |
181
| BINARY, VARBINARY | BytesColumnVector | Direct byte array |
182
| DECIMAL | DecimalColumnVector | Uses HiveDecimal for precision |
183
| DATE | LongColumnVector | Days since epoch |
184
| TIMESTAMP_* | TimestampColumnVector | Microsecond precision |
185
186
## Configuration Options
187
188
Configure the ORC writer through Hadoop Configuration:
189
190
```java
191
Configuration config = new Configuration();
192
193
// Compression settings
194
config.set("orc.compress", "ZLIB"); // NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD
195
config.set("orc.compress.size", "262144"); // 256KB compression blocks
196
197
// Performance settings
198
config.setInt("orc.row.batch.size", 1024); // Rows per batch
199
config.setInt("orc.stripe.size", 67108864); // 64MB stripes
200
config.setBoolean("orc.use.zerocopy", true); // Enable zero-copy reads
201
202
// Memory settings
203
config.setDouble("orc.dictionary.key.threshold", 0.8); // Dictionary encoding threshold
204
```
205
206
## Memory Management
207
208
The bulk writer manages memory efficiently through:
209
210
- **Batch Processing**: Writes data in configurable batch sizes (default 1024 rows)
211
- **Automatic Flushing**: Flushes batches when full to prevent memory buildup
212
- **Stream Management**: Properly closes underlying streams and releases resources
213
- **Vector Reuse**: Reuses vectorized row batches across write operations
214
215
## Error Handling
216
217
Common exceptions and handling strategies:
218
219
```java
220
try {
221
BulkWriter<RowData> writer = factory.create(outputStream);
222
writer.addElement(rowData);
223
writer.finish();
224
} catch (IOException e) {
225
// Handle file system errors, ORC format errors, or write failures
226
logger.error("Failed to write ORC data", e);
227
} catch (UnsupportedOperationException e) {
228
// Handle unsupported data types
229
logger.error("Unsupported data type in schema", e);
230
}
231
```