Factory and implementation for creating ORC bulk writers that efficiently write Flink RowData to ORC files without Hive dependencies. Provides high-performance batch writing with automatic type conversion and memory management.
Factory class that creates BulkWriter instances for writing RowData to ORC files using the no-hive ORC implementation.
/**
* Factory for creating ORC bulk writers without Hive dependencies
* Implements BulkWriter.Factory<RowData> for integration with Flink's file sinks
*/
public class OrcNoHiveBulkWriterFactory implements BulkWriter.Factory<RowData> {
/**
* Creates a new ORC bulk writer factory
* @param conf Hadoop configuration for ORC file settings
* @param schema ORC schema string (e.g., "struct<name:string,age:int>")
* @param fieldTypes Array of Flink logical types matching the schema fields
*/
public OrcNoHiveBulkWriterFactory(Configuration conf, String schema, LogicalType[] fieldTypes);
/**
* Creates a BulkWriter instance for the given output stream
* @param out Output stream to write ORC data to
* @return BulkWriter instance for writing RowData
* @throws IOException if writer creation fails
*/
public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException;
}Usage Examples:
import org.apache.flink.orc.nohive.OrcNoHiveBulkWriterFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.*;
import org.apache.hadoop.conf.Configuration;
// Define schema and types for a user record
String orcSchema = "struct<id:bigint,name:string,email:string,age:int,salary:decimal(10,2)>";
LogicalType[] fieldTypes = {
new BigIntType(),
new VarCharType(255),
new VarCharType(255),
new IntType(),
new DecimalType(10, 2)
};
// Create factory with Hadoop configuration
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("orc.compress", "ZLIB"); // Optional compression
hadoopConfig.setInt("orc.row.batch.size", 1024); // Optional batch size
OrcNoHiveBulkWriterFactory factory = new OrcNoHiveBulkWriterFactory(
hadoopConfig,
orcSchema,
fieldTypes
);
// Use with StreamingFileSink
StreamingFileSink<RowData> sink = StreamingFileSink
.forBulkFormat(outputPath, factory)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(128 * 1024 * 1024) // 128MB files
.build())
.build();
dataStream.addSink(sink);// Complex nested schema example
String complexSchema = "struct<" +
"user_id:bigint," +
"profile:struct<name:string,bio:string>," +
"tags:array<string>," +
"metrics:map<string,double>" +
">";
LogicalType[] complexTypes = {
new BigIntType(),
RowType.of(new VarCharType(100), new VarCharType(500)),
new ArrayType(new VarCharType(50)),
new MapType(new VarCharType(50), new DoubleType())
};
OrcNoHiveBulkWriterFactory complexFactory = new OrcNoHiveBulkWriterFactory(
hadoopConfig,
complexSchema,
complexTypes
);The factory creates BulkWriter instances with the following interface:
/**
* BulkWriter interface for writing RowData to ORC files
* Created by OrcNoHiveBulkWriterFactory.create()
*/
interface BulkWriter<RowData> {
/**
* Add a single RowData element to the ORC file
* @param row RowData instance to write
* @throws IOException if write operation fails
*/
void addElement(RowData row) throws IOException;
/**
* Flush any buffered data to the output stream
* @throws IOException if flush operation fails
*/
void flush() throws IOException;
/**
* Finish writing and close the ORC file
* @throws IOException if close operation fails
*/
void finish() throws IOException;
}Internal implementation that handles ORC file writing with relocated Protobuf classes for no-hive compatibility.
/**
* Physical writer implementation for ORC files without Hive dependencies
* Handles relocated Protobuf classes in orc-core-nohive
*/
public class NoHivePhysicalWriterImpl extends PhysicalWriterImpl {
/**
* Creates a new no-hive physical writer
* @param out Output stream to write to
* @param opts ORC writer options and configuration
* @throws IOException if writer initialization fails
*/
public NoHivePhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts) throws IOException;
/**
* Write ORC metadata using relocated protobuf classes
* @param metadata ORC metadata to write
* @throws IOException if write operation fails
*/
protected void writeMetadata(OrcProto.Metadata metadata) throws IOException;
/**
* Write ORC file footer using relocated protobuf classes
* @param footer ORC file footer to write
* @throws IOException if write operation fails
*/
protected void writeFileFooter(OrcProto.Footer footer) throws IOException;
/**
* Write ORC stripe footer using relocated protobuf classes
* @param footer ORC stripe footer to write
* @throws IOException if write operation fails
*/
protected void writeStripeFooter(OrcProto.StripeFooter footer) throws IOException;
}The bulk writer automatically converts Flink logical types to ORC column vectors:
| Flink Type | ORC Vector Type | Conversion Notes |
|---|---|---|
| BOOLEAN | LongColumnVector | 1 for true, 0 for false |
| TINYINT, SMALLINT, INTEGER, BIGINT | LongColumnVector | Direct mapping |
| FLOAT, DOUBLE | DoubleColumnVector | Direct mapping |
| CHAR, VARCHAR | BytesColumnVector | UTF-8 encoded |
| BINARY, VARBINARY | BytesColumnVector | Direct byte array |
| DECIMAL | DecimalColumnVector | Uses HiveDecimal for precision |
| DATE | LongColumnVector | Days since epoch |
| TIMESTAMP_* | TimestampColumnVector | Microsecond precision |
Configure the ORC writer through Hadoop Configuration:
Configuration config = new Configuration();
// Compression settings
config.set("orc.compress", "ZLIB"); // NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD
config.set("orc.compress.size", "262144"); // 256KB compression blocks
// Performance settings
config.setInt("orc.row.batch.size", 1024); // Rows per batch
config.setInt("orc.stripe.size", 67108864); // 64MB stripes
config.setBoolean("orc.use.zerocopy", true); // Enable zero-copy reads
// Memory settings
config.setDouble("orc.dictionary.key.threshold", 0.8); // Dictionary encoding thresholdThe bulk writer manages memory efficiently through:
Common exceptions and handling strategies:
try {
BulkWriter<RowData> writer = factory.create(outputStream);
writer.addElement(rowData);
writer.finish();
} catch (IOException e) {
// Handle file system errors, ORC format errors, or write failures
logger.error("Failed to write ORC data", e);
} catch (UnsupportedOperationException e) {
// Handle unsupported data types
logger.error("Unsupported data type in schema", e);
}