CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-orc

Apache Flink ORC format connector for reading and writing ORC (Optimized Row Columnar) data files

Pending
Overview
Eval results
Files

bulk-writing.mddocs/

Bulk Writing

The ORC format provides high-performance bulk writing capabilities through the OrcBulkWriterFactory and custom Vectorizer implementations. This enables efficient writing of large datasets to ORC files with full control over the vectorization process.

Writer Factory

@PublicEvolving
public class OrcBulkWriterFactory<T> implements BulkWriter.Factory<T> {
    public OrcBulkWriterFactory(Vectorizer<T> vectorizer);
    public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Configuration configuration);
    public OrcBulkWriterFactory(Vectorizer<T> vectorizer, Properties writerProperties, Configuration configuration);
    
    public BulkWriter<T> create(FSDataOutputStream out) throws IOException;
    
    @VisibleForTesting
    protected OrcFile.WriterOptions getWriterOptions();
}

Factory Method Usage:

import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.hadoop.conf.Configuration;
import java.util.Properties;

// Simple factory with vectorizer only
RowDataVectorizer vectorizer = new RowDataVectorizer(schema, fieldTypes);
OrcBulkWriterFactory<RowData> simpleFactory = new OrcBulkWriterFactory<>(vectorizer);

// Factory with Hadoop configuration
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("fs.defaultFS", "hdfs://namenode:8020");
OrcBulkWriterFactory<RowData> configuredFactory = new OrcBulkWriterFactory<>(
    vectorizer,
    hadoopConfig
);

// Factory with writer properties and configuration
Properties writerProps = new Properties();
writerProps.setProperty("orc.compress", "SNAPPY");
writerProps.setProperty("orc.stripe.size", "134217728");
OrcBulkWriterFactory<RowData> fullFactory = new OrcBulkWriterFactory<>(
    vectorizer,
    writerProps,
    hadoopConfig
);

Vectorizer Base Class

@PublicEvolving
public abstract class Vectorizer<T> implements Serializable {
    public Vectorizer(String schema);
    
    public TypeDescription getSchema();
    public void setWriter(Writer writer);
    public void addUserMetadata(String key, ByteBuffer value);
    
    public abstract void vectorize(T element, VectorizedRowBatch batch) throws IOException;
}

RowData Vectorizer

public class RowDataVectorizer extends Vectorizer<RowData> {
    public RowDataVectorizer(String schema, LogicalType[] fieldTypes);
    
    public void vectorize(RowData row, VectorizedRowBatch batch);
}

Usage Examples

Basic RowData Writing

import org.apache.flink.orc.writer.OrcBulkWriterFactory;
import org.apache.flink.orc.vector.RowDataVectorizer;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.*;

// Define schema and field types
LogicalType[] fieldTypes = {
    new BigIntType(),           // id
    new VarCharType(255),       // name  
    new IntType(),              // age
    new DecimalType(10, 2),     // salary
    new BooleanType(),          // active
    new TimestampType(3)        // created_at
};

String orcSchema = "struct<" +
    "id:bigint," +
    "name:string," +
    "age:int," +
    "salary:decimal(10,2)," +
    "active:boolean," +
    "created_at:timestamp" +
    ">";

// Create vectorizer
RowDataVectorizer vectorizer = new RowDataVectorizer(orcSchema, fieldTypes);

// Create writer factory
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(vectorizer);

// Use in streaming sink
DataStream<RowData> dataStream = // ... your data stream
dataStream.addSink(
    StreamingFileSink.forBulkFormat(
        new Path("/path/to/output"),
        writerFactory
    ).build()
);

Writing with Configuration

import org.apache.hadoop.conf.Configuration;
import java.util.Properties;

// Configure ORC writer properties
Properties writerProperties = new Properties();
writerProperties.setProperty("orc.compress", "SNAPPY");
writerProperties.setProperty("orc.stripe.size", "67108864");
writerProperties.setProperty("orc.row.index.stride", "10000");

// Configure Hadoop settings
Configuration hadoopConfig = new Configuration();
hadoopConfig.set("orc.bloom.filter.columns", "name,id");
hadoopConfig.setFloat("orc.bloom.filter.fpp", 0.05f);

// Create configured writer factory
OrcBulkWriterFactory<RowData> writerFactory = new OrcBulkWriterFactory<>(
    vectorizer,
    writerProperties, 
    hadoopConfig
);

Complex Types Writing

// Schema with complex types
LogicalType[] complexFieldTypes = {
    new BigIntType(),                              // id
    new ArrayType(new VarCharType(100)),          // tags array
    new MapType(new VarCharType(50), new IntType()), // metrics map
    new RowType(Arrays.asList(                     // address struct
        new RowType.RowField("street", new VarCharType(200)),
        new RowType.RowField("city", new VarCharType(100)),
        new RowType.RowField("zip", new VarCharType(10))
    ))
};

String complexOrcSchema = "struct<" +
    "id:bigint," +
    "tags:array<string>," +
    "metrics:map<string,int>," +
    "address:struct<street:string,city:string,zip:string>" +
    ">";

RowDataVectorizer complexVectorizer = new RowDataVectorizer(complexOrcSchema, complexFieldTypes);

Custom Vectorizer Implementation

// Custom vectorizer for POJO classes
public class UserVectorizer extends Vectorizer<User> {
    
    public UserVectorizer() {
        super("struct<id:bigint,name:string,email:string,active:boolean>");
    }
    
    @Override
    public void vectorize(User user, VectorizedRowBatch batch) throws IOException {
        int rowId = batch.size++;
        
        // Set ID column
        ((LongColumnVector) batch.cols[0]).vector[rowId] = user.getId();
        
        // Set name column
        byte[] nameBytes = user.getName().getBytes(StandardCharsets.UTF_8);
        ((BytesColumnVector) batch.cols[1]).setVal(rowId, nameBytes);
        
        // Set email column  
        byte[] emailBytes = user.getEmail().getBytes(StandardCharsets.UTF_8);
        ((BytesColumnVector) batch.cols[2]).setVal(rowId, emailBytes);
        
        // Set active column
        ((LongColumnVector) batch.cols[3]).vector[rowId] = user.isActive() ? 1 : 0;
        
        // Add custom metadata
        if (user.hasSpecialAttribute()) {
            addUserMetadata("special_users", ByteBuffer.wrap("true".getBytes()));
        }
    }
}

// Usage
UserVectorizer userVectorizer = new UserVectorizer();
OrcBulkWriterFactory<User> userWriterFactory = new OrcBulkWriterFactory<>(userVectorizer);

Bulk Writer Implementation

@Internal
public class OrcBulkWriter<T> implements BulkWriter<T> {
    public void addElement(T element) throws IOException;
    public void flush() throws IOException;
    public void finish() throws IOException;
}

The OrcBulkWriter handles the actual writing process:

  • addElement(): Vectorizes and buffers elements, writing batches when full
  • flush(): Forces write of any pending batched data
  • finish(): Finalizes and closes the ORC file

Performance Considerations

Batch Size Optimization

// ORC uses VectorizedRowBatch with default size
VectorizedRowBatch batch = schema.createRowBatch(); // Default: 1024 rows
VectorizedRowBatch customBatch = schema.createRowBatch(2048); // Custom size

Compression Settings

Properties props = new Properties();
props.setProperty("orc.compress", "SNAPPY");     // Fast compression
props.setProperty("orc.compress", "ZLIB");       // Better compression ratio
props.setProperty("orc.compress", "ZSTD");       // Best compression ratio
props.setProperty("orc.compress", "NONE");       // No compression

Stripe Size Configuration

Properties props = new Properties();
props.setProperty("orc.stripe.size", "134217728"); // 128MB stripes
props.setProperty("orc.row.index.stride", "20000"); // Index every 20k rows

Writer Lifecycle

  1. Factory Creation: OrcBulkWriterFactory configured with vectorizer and options
  2. Writer Instantiation: create() method called with output stream
  3. Element Processing: addElement() vectorizes and batches data
  4. Batch Writing: Full batches automatically written to ORC file
  5. Finalization: finish() flushes remaining data and closes file

Error Handling

try {
    writerFactory.create(outputStream);
} catch (IOException e) {
    // Handle writer creation errors
    log.error("Failed to create ORC writer", e);
}

// Vectorizer error handling
@Override
public void vectorize(MyData data, VectorizedRowBatch batch) throws IOException {
    try {
        // Vectorization logic
    } catch (Exception e) {
        throw new IOException("Failed to vectorize data: " + data, e);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-orc

docs

bulk-writing.md

columnar-reading.md

index.md

predicate-pushdown.md

table-api.md

vector-processing.md

tile.json