or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bulk-writing.mdcolumnar-reading.mdindex.mdorc-integration.mdvector-processing.md
tile.json

bulk-writing.mddocs/

Bulk Writing

Factory and implementation for creating ORC bulk writers that efficiently write Flink RowData to ORC files without Hive dependencies. Provides high-performance batch writing with automatic type conversion and memory management.

Capabilities

ORC Bulk Writer Factory

Factory class that creates BulkWriter instances for writing RowData to ORC files using the no-hive ORC implementation.

/**
 * Factory for creating ORC bulk writers without Hive dependencies
 * Implements BulkWriter.Factory<RowData> for integration with Flink's file sinks
 */
public class OrcNoHiveBulkWriterFactory implements BulkWriter.Factory<RowData> {
    
    /**
     * Creates a new ORC bulk writer factory
     * @param conf Hadoop configuration for ORC file settings
     * @param schema ORC schema string (e.g., "struct<name:string,age:int>")
     * @param fieldTypes Array of Flink logical types matching the schema fields
     */
    public OrcNoHiveBulkWriterFactory(Configuration conf, String schema, LogicalType[] fieldTypes);
    
    /**
     * Creates a BulkWriter instance for the given output stream
     * @param out Output stream to write ORC data to
     * @return BulkWriter instance for writing RowData
     * @throws IOException if writer creation fails
     */
    public BulkWriter<RowData> create(FSDataOutputStream out) throws IOException;
}

Usage Examples:

import org.apache.flink.orc.nohive.OrcNoHiveBulkWriterFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.*;
import org.apache.hadoop.conf.Configuration;

// Define schema and types for a user record
String orcSchema = "struct<id:bigint,name:string,email:string,age:int,salary:decimal(10,2)>";
LogicalType[] fieldTypes = {
    new BigIntType(),
    new VarCharType(255),
    new VarCharType(255), 
    new IntType(),
    new DecimalType(10, 2)
};

// Create factory with Hadoop configuration
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("orc.compress", "ZLIB");  // Optional compression
hadoopConfig.setInt("orc.row.batch.size", 1024);  // Optional batch size

OrcNoHiveBulkWriterFactory factory = new OrcNoHiveBulkWriterFactory(
    hadoopConfig,
    orcSchema,
    fieldTypes
);

// Use with StreamingFileSink
StreamingFileSink<RowData> sink = StreamingFileSink
    .forBulkFormat(outputPath, factory)
    .withRollingPolicy(DefaultRollingPolicy.builder()
        .withMaxPartSize(128 * 1024 * 1024)  // 128MB files
        .build())
    .build();

dataStream.addSink(sink);
// Complex nested schema example
String complexSchema = "struct<" +
    "user_id:bigint," +
    "profile:struct<name:string,bio:string>," +
    "tags:array<string>," +
    "metrics:map<string,double>" +
    ">";

LogicalType[] complexTypes = {
    new BigIntType(),
    RowType.of(new VarCharType(100), new VarCharType(500)),
    new ArrayType(new VarCharType(50)),
    new MapType(new VarCharType(50), new DoubleType())
};

OrcNoHiveBulkWriterFactory complexFactory = new OrcNoHiveBulkWriterFactory(
    hadoopConfig,
    complexSchema, 
    complexTypes
);

BulkWriter Interface

The factory creates BulkWriter instances with the following interface:

/**
 * BulkWriter interface for writing RowData to ORC files
 * Created by OrcNoHiveBulkWriterFactory.create()
 */
interface BulkWriter<RowData> {
    /**
     * Add a single RowData element to the ORC file
     * @param row RowData instance to write
     * @throws IOException if write operation fails
     */
    void addElement(RowData row) throws IOException;
    
    /**
     * Flush any buffered data to the output stream
     * @throws IOException if flush operation fails
     */
    void flush() throws IOException;
    
    /**
     * Finish writing and close the ORC file
     * @throws IOException if close operation fails
     */
    void finish() throws IOException;
}

Physical Writer Implementation

Internal implementation that handles ORC file writing with relocated Protobuf classes for no-hive compatibility.

/**
 * Physical writer implementation for ORC files without Hive dependencies
 * Handles relocated Protobuf classes in orc-core-nohive
 */
public class NoHivePhysicalWriterImpl extends PhysicalWriterImpl {
    
    /**
     * Creates a new no-hive physical writer
     * @param out Output stream to write to
     * @param opts ORC writer options and configuration
     * @throws IOException if writer initialization fails
     */
    public NoHivePhysicalWriterImpl(FSDataOutputStream out, OrcFile.WriterOptions opts) throws IOException;
    
    /**
     * Write ORC metadata using relocated protobuf classes
     * @param metadata ORC metadata to write
     * @throws IOException if write operation fails
     */
    protected void writeMetadata(OrcProto.Metadata metadata) throws IOException;
    
    /**
     * Write ORC file footer using relocated protobuf classes
     * @param footer ORC file footer to write
     * @throws IOException if write operation fails
     */
    protected void writeFileFooter(OrcProto.Footer footer) throws IOException;
    
    /**
     * Write ORC stripe footer using relocated protobuf classes
     * @param footer ORC stripe footer to write  
     * @throws IOException if write operation fails
     */
    protected void writeStripeFooter(OrcProto.StripeFooter footer) throws IOException;
}

Type Conversion

The bulk writer automatically converts Flink logical types to ORC column vectors:

Flink TypeORC Vector TypeConversion Notes
BOOLEANLongColumnVector1 for true, 0 for false
TINYINT, SMALLINT, INTEGER, BIGINTLongColumnVectorDirect mapping
FLOAT, DOUBLEDoubleColumnVectorDirect mapping
CHAR, VARCHARBytesColumnVectorUTF-8 encoded
BINARY, VARBINARYBytesColumnVectorDirect byte array
DECIMALDecimalColumnVectorUses HiveDecimal for precision
DATELongColumnVectorDays since epoch
TIMESTAMP_*TimestampColumnVectorMicrosecond precision

Configuration Options

Configure the ORC writer through Hadoop Configuration:

Configuration config = new Configuration();

// Compression settings
config.set("orc.compress", "ZLIB");  // NONE, ZLIB, SNAPPY, LZO, LZ4, ZSTD
config.set("orc.compress.size", "262144");  // 256KB compression blocks

// Performance settings  
config.setInt("orc.row.batch.size", 1024);  // Rows per batch
config.setInt("orc.stripe.size", 67108864);  // 64MB stripes
config.setBoolean("orc.use.zerocopy", true);  // Enable zero-copy reads

// Memory settings
config.setDouble("orc.dictionary.key.threshold", 0.8);  // Dictionary encoding threshold

Memory Management

The bulk writer manages memory efficiently through:

  • Batch Processing: Writes data in configurable batch sizes (default 1024 rows)
  • Automatic Flushing: Flushes batches when full to prevent memory buildup
  • Stream Management: Properly closes underlying streams and releases resources
  • Vector Reuse: Reuses vectorized row batches across write operations

Error Handling

Common exceptions and handling strategies:

try {
    BulkWriter<RowData> writer = factory.create(outputStream);
    writer.addElement(rowData);
    writer.finish();
} catch (IOException e) {
    // Handle file system errors, ORC format errors, or write failures
    logger.error("Failed to write ORC data", e);
} catch (UnsupportedOperationException e) {
    // Handle unsupported data types
    logger.error("Unsupported data type in schema", e);
}